diff --git a/CLAUDE.md b/CLAUDE.md index 2ac23d8..3f55748 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -44,7 +44,7 @@ uv run pytest tests/job1/unit_test.py::test_enrich_orders The detailed specs live in [`specs/`](specs/) — read the relevant one **before** working in that area: - [`specs/architecture.md`](specs/architecture.md) — execution flow, CLI surface, key classes, jobs DAG, job **generation** (`scripts/sdk_generate_template_job.py` → `resources/jobs.yml`; never hand-edit), CI/CD, job-level params, deploy-time env vars, logging, production guardrails, adding a new job. -- [`specs/data-model.md`](specs/data-model.md) — catalog/schema isolation, medallion flow, table schemas, price freeze, liquid clustering, DQX/quarantine, lineage. +- [`specs/data-model.md`](specs/data-model.md) — plain-words pipeline overview, catalog/schema isolation, medallion flow, table schemas, **field naming conventions**, product-name freeze, liquid clustering, DQX/quarantine, lineage. - [`specs/test-plan.md`](specs/test-plan.md) — unit / integration / load tests. - [`specs/tooling.md`](specs/tooling.md) — MCP servers (Databricks, AWS billing/docs, context7), CLI, and skills: what to reach for and when. - The AI/BI dashboard (`resources/orders_dashboard.lvdash.json`) is committed and edited directly; the catalog is `${var.catalog}`, resolved at deploy time. @@ -52,7 +52,7 @@ The detailed specs live in [`specs/`](specs/) — read the relevant one **before ### Load-bearing invariants (keep in mind; full detail in specs) - **Catalog-level isolation** — env separation is at the *catalog* level (`dev_` / `staging` / `prod`); the same medallion schemas (`external_source`/`raw`/`curated`/`report`/`ops`) exist in each. Staging/prod catalogs+schemas are owned by `make init`, not the runtime wheel. -- **Price freeze** — silver freezes `line_revenue` at sale time: batch via an insert-only `MERGE`, SDP via a **streaming table** (a materialized view would *restate* the price; a streaming table appends once and *freezes*). `total_value` in gold is `SUM(line_revenue)`. +- **Product-name freeze** — silver freezes `product_name` onto each order line at sale time: batch via an insert-only `MERGE`, SDP via a **streaming table** (a materialized view would *restate* the name; a streaming table appends once and *freezes*). A later rename never relabels booked orders. `unit_price` is static; `total_value` in gold is `SUM(item_total)`. - **Generated job config** — `resources/jobs.yml` is generated by `scripts/sdk_generate_template_job.py`; never hand-edit it (it's gitignored). - **Schema-drift guard** — all medallion writes use `overwriteSchema=false` (the only exception is `ops._health`). @@ -81,4 +81,5 @@ Favor solutions with less code, fewer classes, and fewer abstractions. When two - Don't add `CREATE CATALOG` or `CREATE SCHEMA` calls outside the `args.env == "dev"` branch in `config.py`. Staging/prod catalogs and schemas are owned by `make init`; runtime jobs run without those privileges. - Don't commit `resources/jobs.yml` (gitignored — regenerated on every deploy). - Don't commit `.databricks-resources.json` (gitignored — local provisioning state, diverges per developer). +- Don't commit or hand-edit `resources/orders_dashboard_deploy.lvdash.json` (gitignored — regenerated from `orders_dashboard.lvdash.json` on every deploy with `${var.catalog}` resolved, since DABs can't substitute bundle vars inside `.lvdash.json` content). Edit the source `orders_dashboard.lvdash.json` instead. - Don't hand-edit `resources/jobs.yml` — it is overwritten on every deploy. Change `scripts/sdk_generate_template_job.py` instead (it generates jobs, the SDP pipeline, and the dashboard resource stanza into `resources/jobs.yml`). `resources/orders_dashboard.lvdash.json` is committed and editable directly. diff --git a/README.md b/README.md index 96aa2d2..ce0a5d2 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ This project template demonstrates how to: Deep technical detail lives in [`specs/`](specs/) (the README stays a landing page): - [**Architecture**](specs/architecture.md) — wheel/CLI surface, jobs DAG, job generation, CI/CD, job-level params, deploy-time env vars, logging, production guardrails, folder structure. -- [**Data model**](specs/data-model.md) — catalog/schema isolation, medallion data flow (diagram), table schemas, price-freeze semantics, liquid clustering, DQX/quarantine, lineage. +- [**Data model**](specs/data-model.md) — catalog/schema isolation, medallion data flow (diagram), table schemas, product-name freeze semantics, liquid clustering, DQX/quarantine, lineage. - [**Test plan**](specs/test-plan.md) — unit, integration, and load tests. - [**Tooling**](specs/tooling.md) — MCP servers (Databricks, AWS billing/docs, context7), the Databricks CLI, and the bundled skills. diff --git a/resources/orders_dashboard.lvdash.json b/resources/orders_dashboard.lvdash.json index 80ddd75..e66d2aa 100644 --- a/resources/orders_dashboard.lvdash.json +++ b/resources/orders_dashboard.lvdash.json @@ -4,7 +4,7 @@ "name": "ds_orders", "displayName": "Orders", "queryLines": [ - "SELECT order_date, country, customer_name AS customer, product_name AS product, category_name AS category, SUM(total_value) AS total_value, SUM(total_orders) AS total_orders FROM ${var.catalog}.report.order_agg WHERE order_date BETWEEN :date_range.min AND :date_range.max GROUP BY 1, 2, 3, 4, 5" + "SELECT a.order_date, a.country, a.customer_name AS customer, l.product_name AS product, a.product_name AS product_name, a.category_name AS category, SUM(a.total_value) AS total_value, SUM(a.total_orders) AS total_orders FROM ${var.catalog}.report.order_agg a JOIN (SELECT product_id, product_name FROM (SELECT product_id, product_name, ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY order_date DESC, product_name DESC) AS rn FROM ${var.catalog}.report.order_agg) WHERE rn = 1) l ON a.product_id = l.product_id WHERE a.order_date BETWEEN :date_range.min AND :date_range.max GROUP BY 1, 2, 3, 4, 5, 6" ], "parameters": [ { diff --git a/scripts/sdk_drop_tables.py b/scripts/sdk_drop_tables.py index 9d20d6f..97a9141 100644 --- a/scripts/sdk_drop_tables.py +++ b/scripts/sdk_drop_tables.py @@ -7,10 +7,15 @@ from _sdk_sql import get_warehouse_id, run_sql from template.config import MEDALLION_SCHEMAS -# DROP must match the object kind: a materialized view / streaming table / view cannot be -# removed with DROP TABLE (it raises a type-mismatch error that IF EXISTS does NOT suppress). -# The SDP pipeline materializes MVs and streaming tables in the raw/curated/report schemas, -# so a blind DROP TABLE loop aborts mid-way and leaves the catalog half-dropped. +# DROP must match the object kind: a materialized view / view cannot be removed with +# DROP TABLE (it raises a type-mismatch error that IF EXISTS does NOT suppress). The SDP +# pipeline materializes MVs and streaming tables in the raw/curated/report schemas, so a +# blind DROP TABLE loop aborts mid-way and leaves the catalog half-dropped. +# +# Some SQL warehouse channels also reject the `DROP STREAMING TABLE` grammar with a +# PARSE_SYNTAX_ERROR (while accepting a plain DROP TABLE on the streaming table). So we try +# the kind-specific statement first and fall back to DROP TABLE — _drop_candidates() always +# appends DROP TABLE as the last resort. _DROP_STMT = { TableType.MATERIALIZED_VIEW: "DROP MATERIALIZED VIEW IF EXISTS", TableType.STREAMING_TABLE: "DROP STREAMING TABLE IF EXISTS", @@ -19,6 +24,17 @@ } +def _drop_candidates(table_type) -> list[str]: + """DROP statements to try in order. The kind-specific form first (when there is one), + then plain DROP TABLE as a fallback for warehouses whose parser rejects the + kind-specific grammar (notably DROP STREAMING TABLE).""" + specific = _DROP_STMT.get(table_type) + candidates = [specific] if specific else [] + if "DROP TABLE IF EXISTS" not in candidates: + candidates.append("DROP TABLE IF EXISTS") + return candidates + + def _resolve_catalog(workspace: WorkspaceClient, env: str) -> str: if env == "dev": local_part = workspace.current_user.me().user_name.split("@")[0] @@ -56,8 +72,17 @@ def main(): count = 0 for schema in MEDALLION_SCHEMAS: for table in workspace.tables.list(catalog_name=catalog, schema_name=schema): - stmt = _DROP_STMT.get(table.table_type, "DROP TABLE IF EXISTS") - run_sql(workspace, warehouse_id, f"{stmt} `{catalog}`.`{schema}`.`{table.name}`") + fq = f"`{catalog}`.`{schema}`.`{table.name}`" + last_err: Exception | None = None + for stmt in _drop_candidates(table.table_type): + try: + run_sql(workspace, warehouse_id, f"{stmt} {fq}") + last_err = None + break + except Exception as e: # try the next candidate (e.g. parser rejects DROP STREAMING TABLE) + last_err = e + if last_err is not None: + raise last_err count += 1 print(f"Done. {count} object(s) dropped.") diff --git a/scripts/sdk_generate_template_job.py b/scripts/sdk_generate_template_job.py index b677a84..8bf294c 100644 --- a/scripts/sdk_generate_template_job.py +++ b/scripts/sdk_generate_template_job.py @@ -239,11 +239,12 @@ def _build_job(environment: str, sp_id: str | None) -> dict: ] ) - schedule = None + # No standalone schedule on the batch job: in prod it is triggered solely by + # job1_prod_integration (via its `run` RunJobTask), so the orchestrator is the + # single entry point. Prod still gets failure/duration alerting below. email_notifications = None health = None if environment == "prod": - schedule = CronSchedule(quartz_cron_expression="0 0 5 * * ?", timezone_id="UTC") email_notifications = JobEmailNotifications( on_failure=_alert_emails(), on_duration_warning_threshold_exceeded=_alert_emails(), @@ -264,8 +265,8 @@ def _build_job(environment: str, sp_id: str | None) -> dict: name=f"{JOB_NAME}_${{bundle.target}}", timeout_seconds=3600, # max_concurrent_runs=1 + queue.enabled=True: if a run is already in flight - # when the next scheduled tick arrives, queue it rather than silently - # skipping. Skipping a scheduled prod run is almost never what you want. + # when job1_prod_integration triggers this job again, queue it rather than + # silently skipping. Skipping a prod run is almost never what you want. max_concurrent_runs=1, queue=QueueSettings(enabled=True), # Suppress alerts for runs that were manually cancelled or skipped by an @@ -286,7 +287,6 @@ def _build_job(environment: str, sp_id: str | None) -> dict: ], tags=_tags(environment), environments=_environments(), - schedule=schedule, email_notifications=email_notifications, health=health, tasks=tasks, diff --git a/specs/CHANGELOG.md b/specs/CHANGELOG.md index 99d25ed..e4b9976 100644 --- a/specs/CHANGELOG.md +++ b/specs/CHANGELOG.md @@ -2,6 +2,12 @@ --- +## [#43] · feat/freeze-product-name · 2026-06-23 · feat: freeze product_name instead of line_revenue + +Removed the synthetic `line_revenue`/`unit_price_at_sale` columns — gold `total_value` is now `SUM(item_total)` (the line value the source already freezes on the order) — and re-pointed the silver insert-only-MERGE / streaming-table freeze at the mutable `product_name`, which the seed now changes by renaming 2 products per run (`Product N` → `Product N.k`); `unit_price` stays as a static attribute. The AI/BI "by product" chart and Product filter both identify a product by its latest name (consolidating by `product_id`, one line per physical product across renames, and filtering shows the full pre-/post-rename history); the frozen historical names remain in `report.order_agg` for audit. Also removed the batch `job1`'s standalone prod schedule (the SDP pipeline already had none) so `job1_prod_integration` is the single prod trigger orchestrating seed → batch + SDP, fixed `sdk_drop_tables.py` to fall back to `DROP TABLE` when a warehouse's parser rejects `DROP STREAMING TABLE`, and updated unit/integration tests, schemas, the SDP pipeline, and docs. + +--- + ## [#42] · docs/reorg-specs-tooling · 2026-06-23 · docs: rename docs/→assets/, consolidate tooling into specs/tooling.md, slim CLAUDE.md Renamed the image-only `docs/` folder to `assets/` (it held no prose, only screenshots and the CI/CD draw.io export) and repointed every `` reference in the README and specs. Added `specs/tooling.md` consolidating all four MCP servers (Databricks, aws-billing-cost, aws-documentation, context7), the Databricks CLI, and the bundled skills, then trimmed `CLAUDE.md`'s tooling section to a short every-session decision list that points at it. Moved the repo folder-structure tree out of `architecture.md` into the `specs/README.md` routing hub and flagged `CHANGELOG.md` there as append-only (don't read for context). diff --git a/specs/README.md b/specs/README.md index 4b557a2..6a7eae7 100644 --- a/specs/README.md +++ b/specs/README.md @@ -8,7 +8,7 @@ reference. Read the relevant spec before working in that area. | Spec | Read it when you're touching… | |---|---| | [architecture.md](architecture.md) | the wheel/CLI surface, jobs DAG, job generation, CI/CD, job-level params, deploy-time env vars, logging, or production guardrails. | -| [data-model.md](data-model.md) | the catalog/schema model, medallion flow, table schemas, the price-freeze semantics, liquid clustering, DQX/quarantine, or lineage. | +| [data-model.md](data-model.md) | the catalog/schema model, medallion flow, table schemas, the product-name freeze semantics, liquid clustering, DQX/quarantine, or lineage. | | [test-plan.md](test-plan.md) | unit, integration, or load tests. | | [tooling.md](tooling.md) | MCP servers (Databricks, AWS, context7), the Databricks CLI, and the bundled skills — what to reach for and when. | | [CHANGELOG.md](CHANGELOG.md) | the per-PR change history. **Append-only — add an entry before every merge; don't read it for context.** | diff --git a/specs/architecture.md b/specs/architecture.md index a4dec60..903015c 100644 --- a/specs/architecture.md +++ b/specs/architecture.md @@ -44,7 +44,9 @@ does not expose custom env vars to the process). ## Jobs DAG The batch job (`job1`) runs as a Lakeflow Job DAG; the declarative path (`job1_sdp`) runs the same -ETL as a Spark Declarative Pipeline. +ETL as a Spark Declarative Pipeline. In **prod**, neither has its own schedule: `job1_prod_integration` +is the sole scheduled entry point (daily, 06:00 America/Sao_Paulo) — it runs `seed_sources`, then +triggers the batch `job1` (`RunJobTask`) and the `job1_sdp` pipeline (`PipelineTask`) in parallel. diff --git a/specs/data-model.md b/specs/data-model.md index e480aa9..facdad4 100644 --- a/specs/data-model.md +++ b/specs/data-model.md @@ -1,9 +1,29 @@ # Data model -The medallion layout, the catalog/schema isolation model, table schemas, the price-freeze +The medallion layout, the catalog/schema isolation model, table schemas, the attribute-freeze semantics, liquid clustering, and data quality. For how tasks/jobs are wired see [architecture.md](architecture.md). +## The pipeline in plain words + +It pulls the source data from `external_source` — customers, products, orders, and the order +items. **Bronze** (`raw`) is a faithful copy; **silver** (`curated.order_enriched`) joins them into +one row per order line, looking up the product to attach its name and category. Each order line +already carries its own `item_total` — the money for that line, fixed by the source when the order +was placed — so revenue is never recomputed from the current price list. **Gold** +(`report.order_agg`) rolls those lines up per customer / day / product, and `total_value` is just +the sum of those frozen `item_total`s. + +Products can be **renamed** over time (the daily seed renames a couple — `"Product 1"` → +`"Product 1.1"`). Silver freezes the name onto each order line *as it is processed*, so a rename +never rewrites history: orders booked before the change keep the old name, orders after it get the +new one — both live side by side in gold for the same `product_id`. The **dashboard** identifies a +product by its *latest* name: the line chart **consolidates by `product_id`** (so a renamed product +stays one line, labeled with the current name), and the Product **filter** lists each product once +by that current name — selecting it shows the product's *full* history, pre- and post-rename. The old +name is never lost: gold keeps the frozen `product_name` on every row, so the rename is fully +auditable in the data even though the chart and filter present the single current label. + ## Catalog / schema model (load-bearing) **Environment isolation is at the *catalog* level, not the schema level.** The same medallion @@ -127,21 +147,42 @@ Canonical schemas live in `commonSchemas.py` (`order_enriched_schema`, `order_ag - **`curated.order_enriched`**: `customer_name, country, customer_id, order_id, order_total, order_date (DateType), product_id, product_name, product_category_id, category_name, item_seq, - item_description, item_quantity, item_total, line_revenue, unit_price_at_sale`. + item_description, item_quantity, item_total`. - **`report.order_agg`**: `customer_name, country, order_date (DateType), product_id, product_name, product_category_id, category_name, total_quantity, total_value, total_orders`. -`total_value` in gold is `SUM(line_revenue)`, **not** `SUM(item_total)`. `line_revenue = -item_quantity × unit_price_at_sale`, computed in silver by joining the `external_source.product` -dimension and **frozen** at first processing (see below). `product_name` (`"Product 1"`) and +`total_value` in gold is `SUM(item_total)` — the line value the source froze on the order at sale +time, so a later price change never restates historical revenue. `product_name` (`"Product 1"`) and `category_name` (`"Category 2"`, derived as `concat('Category ', product_category_id)`) are human-readable labels carried alongside the numeric ids; the dashboard displays the labels. - -## Incremental silver: price freeze (load-bearing) - -`external_source.product` is a mutable dimension — the daily seed bumps `unit_price` for a few -products each run. The pipeline freezes the price at sale time so a later price change never -restates already-booked revenue. **Both** pipelines freeze, by different mechanisms: +`product_name` is **frozen** per row at first processing (see below) — it is the mutable attribute the +freeze pattern is demonstrated against. + +### Field naming conventions + +Medallion field names follow a small set of rules (canonical schemas in `commonSchemas.py` are the +source of truth): + +- **`{entity}_id` suffix** for identifiers — `customer_id`, `product_id`, `order_id`, + `product_category_id`. +- **Entity-qualified names** — `customer_name`, `product_name`, `category_name`, `order_date`, + `order_total` (not bare `name` / `date` / `total` once past the source layer). +- **`item_*` prefix** for order-item-level fields — `item_seq`, `item_description`, `item_quantity`, + `item_total`. +- **`total_*` prefix** for gold aggregates — `total_quantity`, `total_value`, `total_orders`. +- **No abbreviations** — spell words out (`quantity`, not `qty`; `description`, not `desc`); the raw + source columns (`qty`, `desc_item`, `total_item`) are renamed at the silver boundary. +- **`DateType` for dates** — `order_date` is cast from the source string to `DateType` in silver. +- **`_sdp` suffix** for the declarative-pipeline table variants (`order_enriched_sdp`, + `order_agg_sdp`, `product_sdp`, …) so the batch and SDP outputs never collide in the same schema. + +## Incremental silver: product-name freeze (load-bearing) + +`external_source.product` is a mutable dimension — the daily seed renames a couple of products each +run (`"Product 1"` → `"Product 1.1"` → `"Product 1.2"`, suffix = cumulative rename count). The +pipeline freezes `product_name` onto each order line at sale time, so a later rename never relabels +already-booked orders (`unit_price` is a static attribute and `total_value = SUM(item_total)`, so +revenue is never restated either). **Both** pipelines freeze, by different mechanisms: - **`job1` (batch)** — `generate_orders` does first-run-full / incremental-after: first run (silver empty) overwrites all backfilled orders; every subsequent run enriches only `date = seed_date` @@ -150,15 +191,19 @@ restates already-booked revenue. **Both** pipelines freeze, by different mechani order_date = DATE''`. - **`job1_sdp` (declarative)** — silver (`curated.order_enriched_sdp`) and bronze `raw.order_item_sdp` are **streaming tables** (`@dp.table` + `spark.readStream`). A stream–static join (streaming - `order_item` fact ⨝ static dims) appends each row once and never reprocesses it, so `line_revenue` - is frozen on append. **A materialized view would restate** the price on every refresh — that is + `order_item` fact ⨝ static dims) appends each row once and never reprocesses it, so `product_name` + is frozen on append. **A materialized view would restate** the name on every refresh — that is why silver had to become a streaming table. Gold stays an MV because it re-sums already-frozen silver. Why an MV restates but a streaming table freezes: an MV is *defined as a query over current inputs* and recomputes from scratch (latest-wins); a streaming table consumes new input rows once and -appends. "Incremental" (Enzyme re-reading only changed files) is efficiency, not semantics. Known -limitations (acceptable for a template): the first-run backfill freezes at the *current* price; -freeze is at *processing* time, not strictly *order date*; country is frozen at append time in SDP. +appends. "Incremental" (Enzyme re-reading only changed files) is efficiency, not semantics. The +dashboard consolidates the "by product" chart by `product_id` (labeled with each product's latest +name) and the Product filter selects by that same latest name, so a renamed product stays one line +and filtering it shows its full pre-/post-rename history; the frozen historical names remain in +`report.order_agg` for audit. Known limitations (acceptable for a template): the first-run backfill +freezes the *current* name; freeze is at *processing* time, not strictly *order date*; country is +frozen at append time in SDP. ## Liquid clustering diff --git a/specs/test-plan.md b/specs/test-plan.md index df755df..45636d6 100644 --- a/specs/test-plan.md +++ b/specs/test-plan.md @@ -25,9 +25,9 @@ Coverage: `make unit-test` writes a report under `coverage_reports/` (uploaded a | `test_arg_parser` | CLI parsing; `--task` choices derive from `TASKS`. | | `test_config` | `Config` resolves catalog/values per env. | | `test_validate_orders_from_source` | DQX split — valid rows pass, bad rows quarantine. | -| `test_enrich_orders` | silver enrichment + `line_revenue = qty × unit_price_at_sale`. | -| `test_aggregate_orders` | gold aggregation; `total_value = SUM(line_revenue)`. | -| `test_seed_sources_*` (6) | seed incremental order/item/customer/price schemas; price updates deterministic per date; incremental ids unique across days. | +| `test_enrich_orders` | silver enrichment (joins + frozen `product_name`). | +| `test_aggregate_orders` | gold aggregation; `total_value = SUM(item_total)`. | +| `test_seed_sources_*` (7) | seed incremental order/item/customer/name schemas; name updates deterministic per date and follow the `Product .` suffix; incremental ids unique across days. | | Test (SDP — `unit_test_sdp.py`) | Asserts | |---|---| @@ -50,14 +50,14 @@ generated by `_build_job_integration_test` in `sdk_generate_template_job.py` and ### Standard mode Seeds 2 customers, 3 products, and orders that deliberately include DQX edge cases (a `total > 1000` -WARN row, a null `id`, duplicate `id`s). Validate expects exactly **2 gold rows** with frozen -`total_value`: John Doe `2×$10 + 1×$10 = $30`, Jane Smith `3×$20 = $60`. +WARN row, a null `id`, duplicate `id`s). Validate expects exactly **2 gold rows** with +`total_value = SUM(item_total)`: John Doe `$50 + $50 = $100`, Jane Smith `$151`. ## Load tests Same `setup`/`validate` tasks with `load_test=true`, exercising both the initial bulk load and incremental daily updates at production scale. Seeds **500 customers × 100 products**, **2M orders**, -**6M order_items** (3 items/order). With `unit_price=25` and `qty=2`, each (customer, product) group +**6M order_items** (3 items/order). With `item_total=50` and `qty=2`, each (customer, product) group lands at `total_quantity=240`, `total_value=6,000.0`, `total_orders=40`. Validate expects **50,000 gold rows** (500 × 100) in both `report.order_agg` and `report.order_agg_sdp`, and fails if any row deviates from those expected aggregates. diff --git a/src/template/commonSchemas.py b/src/template/commonSchemas.py index e857955..bdc9eac 100644 --- a/src/template/commonSchemas.py +++ b/src/template/commonSchemas.py @@ -37,11 +37,11 @@ ] ) -# Product dimension. unit_price is mutable: the daily seed bumps prices over time, -# which is what makes the silver "freeze at sale time" vs "restate" distinction -# observable. line_revenue downstream = qty * unit_price captured when the order -# row is first processed. `name` carries the human-readable label ("Product 1"). -# category_id/category_name are stable product attributes (not on the order). +# Product dimension. `name` is mutable: the daily seed renames a couple of products +# over time (e.g. "Product 1" → "Product 1.1"), which is what makes the silver +# "freeze at sale time" vs "restate" distinction observable — silver freezes the +# product_name onto each order line at processing time. unit_price is a static +# attribute (set once, never bumped). category_id/category_name are stable too. product_schema = StructType( [ StructField("product_id", IntegerType(), True), @@ -68,9 +68,6 @@ StructField("item_description", StringType(), True), StructField("item_quantity", IntegerType(), True), StructField("item_total", FloatType(), True), - # line_revenue = item_quantity * unit_price-at-sale, frozen at first processing. - StructField("line_revenue", DoubleType(), True), - StructField("unit_price_at_sale", FloatType(), True), ] ) @@ -84,7 +81,7 @@ StructField("product_category_id", IntegerType(), True), StructField("category_name", StringType(), True), StructField("total_quantity", LongType(), True), - # total_value is now SUM(line_revenue), not SUM(item_total). + # total_value is SUM(item_total) — the line value frozen on the order at sale time. StructField("total_value", DoubleType(), True), StructField("total_orders", LongType(), True), ] diff --git a/src/template/job1/extract_source1.py b/src/template/job1/extract_source1.py index c085192..c7e2c98 100644 --- a/src/template/job1/extract_source1.py +++ b/src/template/job1/extract_source1.py @@ -2,8 +2,8 @@ # Bronze dimension copies. Both are full overwrites of small dimension tables, so # they are intentionally not liquid-clustered (clustering can't amortise under a -# daily full rewrite). The mutable attribute (product.unit_price) is carried -# faithfully so the silver layer sees the current price on each run. +# daily full rewrite). The mutable attribute (product.name) is carried faithfully +# so the silver layer sees the current product name on each run. DIMENSIONS = [ ("external_source.customer", "raw.customer"), ("external_source.product", "raw.product"), diff --git a/src/template/job1/generate_orders.py b/src/template/job1/generate_orders.py index 5ebf055..1d43022 100644 --- a/src/template/job1/generate_orders.py +++ b/src/template/job1/generate_orders.py @@ -8,9 +8,10 @@ def __init__(self, config): super().__init__(config) def enrich_order(self, df_customer, df_order, df_order_item, df_product): - # order_item ⨝ order ⨝ customer ⨝ product. line_revenue is computed from the - # product dimension's CURRENT unit_price at the moment this row is processed; - # the incremental MERGE in run() then freezes it (INSERT-only, never updated). + # order_item ⨝ order ⨝ customer ⨝ product. product_name is read from the + # product dimension's CURRENT value at the moment this row is processed; the + # incremental MERGE in run() then freezes it (INSERT-only, never updated), so a + # later rename never relabels already-booked orders. return ( df_order_item.join(df_order, df_order_item["id_order"] == df_order["id"]) .join(df_customer, df_order["id_customer"] == df_customer["id"]) @@ -30,8 +31,6 @@ def enrich_order(self, df_customer, df_order, df_order_item, df_product): df_order_item["desc_item"].alias("item_description"), df_order_item["qty"].alias("item_quantity"), df_order_item["total_item"].alias("item_total"), - (df_order_item["qty"] * df_product["unit_price"]).cast("double").alias("line_revenue"), - df_product["unit_price"].alias("unit_price_at_sale"), ) ) @@ -49,8 +48,8 @@ def run(self): if first_run: # Backfill: the initial 2M orders span ~1 year of dates, so we can't filter - # by seed_date — process the whole table once. Every row freezes at the - # current price (synthetic backfill has no historical price to honour). + # by seed_date — process the whole table once. Every row freezes the current + # product_name (synthetic backfill has no rename history to honour). df_order = self.spark.read.table("raw.order") df_order_item = self.spark.read.table("raw.order_item") df_out = self.enrich_order(df_customer, df_order, df_order_item, df_product) @@ -58,8 +57,8 @@ def run(self): else: # Daily incremental: only orders for seed_date are new (raw.order.date is the # source string date). Enrich just those and INSERT (no UPDATE) — existing - # rows keep their own frozen price, so a later price change never restates - # already-booked revenue. + # rows keep their own frozen product_name, so a later rename never relabels + # already-booked orders. df_order = self.spark.read.table("raw.order").filter(F.col("date") == seed_date) df_order_item = self.spark.read.table("raw.order_item") df_new = self.enrich_order(df_customer, df_order, df_order_item, df_product) diff --git a/src/template/job1/generate_orders_agg.py b/src/template/job1/generate_orders_agg.py index e1907b9..1441af9 100644 --- a/src/template/job1/generate_orders_agg.py +++ b/src/template/job1/generate_orders_agg.py @@ -8,8 +8,8 @@ def __init__(self, config): super().__init__(config) def aggregate_orders(self, df_order): - # total_value sums the frozen line_revenue (qty × unit_price-at-sale), not the - # raw item_total — so a later price change never restates historical revenue. + # total_value sums item_total — the line value the source froze on the order at + # sale time, so a later price change never restates historical revenue. return df_order.groupBy( "customer_name", "country", @@ -20,7 +20,7 @@ def aggregate_orders(self, df_order): "category_name", ).agg( F.sum("item_quantity").alias("total_quantity"), - F.sum("line_revenue").alias("total_value"), + F.sum("item_total").alias("total_value"), F.countDistinct("order_id").alias("total_orders"), ) diff --git a/src/template/job1/seed_sources.py b/src/template/job1/seed_sources.py index da723f0..a87d8e8 100644 --- a/src/template/job1/seed_sources.py +++ b/src/template/job1/seed_sources.py @@ -18,24 +18,32 @@ _INCREMENTAL_ORDERS = 5_000 _INCREMENTAL_CUSTOMER_UPDATES = 50 -_INCREMENTAL_PRICE_UPDATES = 5 # products whose unit_price changes each day +_INCREMENTAL_NAME_UPDATES = 2 # products whose name changes each day def _product_category(product_id: int) -> tuple[int, str]: """Stable category attribute of a product, mirroring the Spark expression in - _seed_initial: category = (product_id - 1) % 10 + 1. Used to build the price-update + _seed_initial: category = (product_id - 1) % 10 + 1. Used to build the name-update rows so the formula lives in one place and the two product-writing paths can't drift.""" cat = (product_id - 1) % 10 + 1 return cat, f"Category {cat}" +def _product_unit_price(product_id: int) -> float: + """Static unit_price of a product, mirroring the Spark expression in _seed_initial. + unit_price never changes after the initial load; it is carried on name-update rows + only to satisfy product_schema (the MERGE updates name alone).""" + return float(((product_id - 1) % 10 + 1) * 5.0 + 4.99) + + class SeedSources(BaseTask): """ Idempotent seeder for external_source tables. First run (empty tables): full initial load — 500 customers, 2M orders, 6M order_items. - Subsequent runs: append 5 000 orders (+1 item each) and update 50 customers' country. - Incremental IDs are anchored to seed_date so reruns of the same date are no-ops. + Subsequent runs: append 5 000 orders (+1 item each), update 50 customers' country, and + rename 2 products (e.g. "Product 1" → "Product 1.1"). Incremental IDs and renames are + anchored to seed_date so reruns of the same date are no-ops. """ def run(self) -> None: @@ -107,10 +115,11 @@ def _seed_initial(self, catalog: str, seed_date: str) -> None: .alias("country"), ).write.mode("overwrite").option("overwriteSchema", "false").saveAsTable(f"{catalog}.{SCHEMA}.customer") - # Product dimension: 100 products. unit_price starts category-banded - # (category = (product_id-1) % 10 + 1, so price spreads $10–$55) and is bumped - # over time by the incremental seed — that mutation is what the silver freeze - # vs. restate behaviour is demonstrated against. name is the readable label. + # Product dimension: 100 products. unit_price is category-banded + # (category = (product_id-1) % 10 + 1, so price spreads $10–$55) and STATIC — it + # never changes after this load. name ("Product N") is the mutable attribute: the + # incremental seed renames a couple of products each day, and that mutation is what + # the silver freeze vs. restate behaviour is demonstrated against. self.spark.range(1, _INITIAL_PRODUCTS + 1).select( F.col("id").cast(IntegerType()).alias("product_id"), F.concat(F.lit("Product "), F.col("id")).alias("name"), @@ -164,22 +173,22 @@ def _seed_incremental(self, catalog: str, seed_date: str) -> None: WHEN MATCHED THEN UPDATE SET t.country = s.country """) - df_prices = self._build_price_updates(seed_date) - df_prices.createOrReplaceTempView("_seed_price_updates") + df_names = self._build_name_updates(seed_date) + df_names.createOrReplaceTempView("_seed_name_updates") self.spark.sql(f""" MERGE INTO {catalog}.{SCHEMA}.product AS t - USING _seed_price_updates AS s ON t.product_id = s.product_id - WHEN MATCHED THEN UPDATE SET t.unit_price = s.unit_price + USING _seed_name_updates AS s ON t.product_id = s.product_id + WHEN MATCHED THEN UPDATE SET t.name = s.name """) day_offset = (_date.fromisoformat(seed_date) - _EPOCH).days self.logger.info( - "incremental seed complete date=%s orders=%d items=%d customers_updated=%d prices_updated=%d country=%s", + "incremental seed complete date=%s orders=%d items=%d customers_updated=%d names_updated=%d country=%s", seed_date, _INCREMENTAL_ORDERS, _INCREMENTAL_ORDERS, _INCREMENTAL_CUSTOMER_UPDATES, - _INCREMENTAL_PRICE_UPDATES, + _INCREMENTAL_NAME_UPDATES, _COUNTRIES[day_offset % len(_COUNTRIES)], ) @@ -218,23 +227,25 @@ def _build_customer_updates(self, seed_date: str): update_rows = [(cid, f"Customer_{cid}", new_country) for cid in customer_ids] return self.spark.createDataFrame(update_rows, schema=customer_schema) - def _build_price_updates(self, seed_date: str): - # Bump 5 products' unit_price each day by a day-anchored factor. Anchoring the - # factor to seed_date keeps reruns of the same date idempotent (same result). + @staticmethod + def _products_renamed_on(day_offset: int) -> list[int]: + """The product ids selected for a rename on a given day, by a day-anchored + rotation. Deterministic so reruns of the same date are idempotent.""" + start = day_offset * _INCREMENTAL_NAME_UPDATES % _INITIAL_PRODUCTS + return [((start + i) % _INITIAL_PRODUCTS) + 1 for i in range(_INCREMENTAL_NAME_UPDATES)] + + def _build_name_updates(self, seed_date: str): + # Rename 2 products each day: "Product N" → "Product N.k", where k is the + # cumulative number of times product N has been selected through this day. The + # suffix increments on each successive rename (Product 1.1, Product 1.2, ...). + # Everything is a deterministic function of the day offset, so reruns of the same + # date produce identical names (idempotent MERGE). day_offset = (_date.fromisoformat(seed_date) - _EPOCH).days - start = day_offset * _INCREMENTAL_PRICE_UPDATES % _INITIAL_PRODUCTS - product_ids = [((start + i) % _INITIAL_PRODUCTS) + 1 for i in range(_INCREMENTAL_PRICE_UPDATES)] - factor = 1.0 + ((day_offset % 5) - 2) * 0.05 # -10% .. +10% - # The MERGE below only updates unit_price; category columns are carried solely to - # satisfy product_schema, but we derive them from the shared helper so they stay - # correct if the MERGE is ever extended to category. - update_rows = [ - ( - pid, - f"Product {pid}", - round(((pid - 1) % 10 + 1) * 5.0 + 4.99) * factor, - *_product_category(pid), - ) - for pid in product_ids - ] + # The MERGE below only updates name; unit_price/category columns are carried solely + # to satisfy product_schema and derived from the shared helpers so they stay correct + # if the MERGE is ever extended. + update_rows = [] + for pid in self._products_renamed_on(day_offset): + k = sum(1 for d in range(day_offset + 1) if pid in self._products_renamed_on(d)) + update_rows.append((pid, f"Product {pid}.{k}", _product_unit_price(pid), *_product_category(pid))) return self.spark.createDataFrame(update_rows, schema=product_schema) diff --git a/src/template/job1_sdp/pipeline.py b/src/template/job1_sdp/pipeline.py index c5ddc67..13021ae 100644 --- a/src/template/job1_sdp/pipeline.py +++ b/src/template/job1_sdp/pipeline.py @@ -22,13 +22,13 @@ --------------------------------------------------- Silver is a STREAMING table fed by a stream–static join: the order_item fact streams, while order / customer / product are read static. Each order_item is appended exactly -once and never reprocessed, so the line_revenue (item_quantity × unit_price) computed at -append time is frozen — a later product price change in raw.product_sdp only affects NEW -rows, it does not restate already-booked revenue. A materialized view, by contrast, -recomputes from current inputs on every refresh and WOULD restate price; that is why -silver had to become a streaming table for the freeze to hold. Because customer is read -static, its country attribute is captured at append time too — the whole enriched row is -frozen. Gold stays a materialized_view: it re-sums already-frozen silver, so it is stable. +once and never reprocessed, so the product_name read at append time is frozen onto the +row — a later product rename in raw.product_sdp only affects NEW rows, it does not relabel +already-booked orders. A materialized view, by contrast, recomputes from current inputs on +every refresh and WOULD restate the name; that is why silver had to become a streaming +table for the freeze to hold. Because customer is read static, its country attribute is +captured at append time too — the whole enriched row is frozen. Gold stays a +materialized_view: it re-sums already-frozen silver (total_value = SUM(item_total)), so it is stable. DQX is intentionally absent from this pipeline — apply_checks() stamps run_time timestamps on violation structs, which Enzyme treats as non-deterministic and @@ -99,7 +99,7 @@ def raw_customer_sdp(): @dp.materialized_view( name=f"{_catalog}.raw.product_sdp", comment="Bronze: full copy of external_source.product (mirrors ExtractSource1). " - "Static dimension joined into silver; its mutable unit_price is what silver freezes.", + "Static dimension joined into silver; its mutable name is what silver freezes.", cluster_by=["product_id"], ) def raw_product_sdp(): @@ -125,20 +125,20 @@ def raw_order_sdp(): @dp.table( name=f"{_catalog}.raw.order_item_sdp", comment="Bronze: streaming append of external_source.order_item. Streaming (not a " - "materialized view) so silver can be a streaming table and freeze price on append.", + "materialized view) so silver can be a streaming table and freeze product_name on append.", cluster_by=["id_order"], ) def raw_order_item_sdp(): return spark.readStream.table(f"{_catalog}.external_source.order_item") -# ── Silver: curated.order_enriched_sdp (STREAMING — freezes price) ──────────── +# ── Silver: curated.order_enriched_sdp (STREAMING — freezes product_name) ───── @dp.table( name=f"{_catalog}.curated.order_enriched_sdp", comment="Silver: order_item ⨝ order ⨝ customer ⨝ product (mirrors GenerateOrders). " - "Streaming fact + static dims: each row appended once, so line_revenue is frozen.", + "Streaming fact + static dims: each row appended once, so product_name is frozen.", cluster_by=["order_date"], ) def curated_order_enriched_sdp(): diff --git a/src/template/job1_sdp/transforms.py b/src/template/job1_sdp/transforms.py index 40c1560..d5894ef 100644 --- a/src/template/job1_sdp/transforms.py +++ b/src/template/job1_sdp/transforms.py @@ -19,22 +19,22 @@ def enrich_order( """ Four-way join: order_item ⨝ order ⨝ customer ⨝ product. - Mirrors GenerateOrders.enrich_order exactly. line_revenue = item_quantity × unit_price - is computed from the product dimension's current price; the streaming materialization - in pipeline.py then freezes it (each order_item row is appended once, never - reprocessed) — matching the INSERT-only MERGE freeze on the batch path. + Mirrors GenerateOrders.enrich_order exactly. product_name is read from the product + dimension's current value; the streaming materialization in pipeline.py then freezes + it (each order_item row is appended once, never reprocessed) — matching the INSERT-only + MERGE freeze on the batch path, so a later rename never relabels booked orders. Args: df_customer: raw.customer_sdp df_order: raw.order_sdp df_order_item: raw.order_item_sdp (the streaming fact) - df_product: raw.product_sdp (static dimension — current price) + df_product: raw.product_sdp (static dimension — current name) Returns: Enriched DataFrame with columns: customer_name, country, customer_id, order_id, order_total, order_date, product_id, product_name, product_category_id, category_name, item_seq, item_description, - item_quantity, item_total, line_revenue, unit_price_at_sale + item_quantity, item_total """ return ( df_order_item.join(df_order, df_order_item["id_order"] == df_order["id"]) @@ -55,8 +55,6 @@ def enrich_order( df_order_item["desc_item"].alias("item_description"), df_order_item["qty"].alias("item_quantity"), df_order_item["total_item"].alias("item_total"), - (df_order_item["qty"] * df_product["unit_price"]).cast("double").alias("line_revenue"), - df_product["unit_price"].alias("unit_price_at_sale"), ) ) @@ -65,8 +63,8 @@ def aggregate_orders(df_order_enriched: DataFrame) -> DataFrame: """ Aggregate enriched orders by the report dimensions. - Mirrors GenerateOrdersAgg.aggregate_orders exactly. total_value sums the frozen - line_revenue, not the raw item_total. + Mirrors GenerateOrdersAgg.aggregate_orders exactly. total_value sums item_total — + the line value the source froze on the order at sale time. Args: df_order_enriched: curated.order_enriched_sdp @@ -86,6 +84,6 @@ def aggregate_orders(df_order_enriched: DataFrame) -> DataFrame: "category_name", ).agg( F.sum("item_quantity").alias("total_quantity"), - F.sum("line_revenue").alias("total_value"), + F.sum("item_total").alias("total_value"), F.countDistinct("order_id").alias("total_orders"), ) diff --git a/tests/job1/integration_setup.py b/tests/job1/integration_setup.py index ca7c451..4159935 100644 --- a/tests/job1/integration_setup.py +++ b/tests/job1/integration_setup.py @@ -58,8 +58,9 @@ def _seed_load_test(self, catalog): F.lit("USA").alias("country"), ).write.saveAsTable(f"{catalog}.{SCHEMA}.customer") - # 100 products (ids 1–100). unit_price=25.0 so line_revenue = qty(2) × 25 = 50, - # keeping the per-group total_value (120 items × 50 = 6,000) that _validate_load_test asserts. + # 100 products (ids 1–100). unit_price is a static attribute (25.0); the load + # test's per-group total_value (120 items × item_total 50 = 6,000) comes from + # item_total, which _validate_load_test asserts. self.spark.range(1, 101).select( F.col("id").cast(IntegerType()).alias("product_id"), F.concat(F.lit("Product "), F.col("id")).alias("name"), diff --git a/tests/job1/integration_validate.py b/tests/job1/integration_validate.py index 91b7ae1..b16c766 100644 --- a/tests/job1/integration_validate.py +++ b/tests/job1/integration_validate.py @@ -14,12 +14,12 @@ def __init__(self, config): def _validate_standard(self, catalog): # groupBy(customer_name, country, order_date, product_id, product_name, # product_category_id, category_name) → still 2 rows (one per order). - # total_value is now sum(line_revenue) = qty × unit_price-at-sale: - # John Doe: 2×$10 + 1×$10 = $30 (product 1 @ $10) - # Jane Smith: 3×$20 = $60 (product 2 @ $20) + # total_value is sum(item_total), the line value frozen on the order: + # John Doe: $50 + $50 = $100 (items A + B on order 1) + # Jane Smith: $151 (item C on order 2) expected_data = [ - ("John Doe", "USA", date(2023, 1, 1), 1, "Product 1", 1, "Category 1", 3, 30.0, 1), - ("Jane Smith", "UK", date(2023, 1, 2), 2, "Product 2", 2, "Category 2", 3, 60.0, 1), + ("John Doe", "USA", date(2023, 1, 1), 1, "Product 1", 1, "Category 1", 3, 100.0, 1), + ("Jane Smith", "UK", date(2023, 1, 2), 2, "Product 2", 2, "Category 2", 3, 151.0, 1), ] df_expected = self.spark.createDataFrame(expected_data, schema=order_agg_schema) diff --git a/tests/job1/unit_test.py b/tests/job1/unit_test.py index e06a090..ff5739d 100644 --- a/tests/job1/unit_test.py +++ b/tests/job1/unit_test.py @@ -13,7 +13,7 @@ SeedSources, _INCREMENTAL_ORDERS, _INCREMENTAL_CUSTOMER_UPDATES, - _INCREMENTAL_PRICE_UPDATES, + _INCREMENTAL_NAME_UPDATES, ) from template.commonSchemas import ( customer_schema, @@ -63,8 +63,7 @@ def df_orders_from_source(spark) -> DataFrame: @pytest.fixture def df_orders(spark) -> DataFrame: - # Enriched output of enrich_order: line_revenue = item_quantity × unit_price-at-sale. - # product 1 = "Product 1" priced 10.0, product 2 = "Product 2" priced 25.0. + # Enriched output of enrich_order. product 1 = "Product 1", product 2 = "Product 2". orders_data = [ ( "John Doe", @@ -81,8 +80,6 @@ def df_orders(spark) -> DataFrame: "Item A", 2, 50.0, - 20.0, - 10.0, ), ( "John Doe", @@ -99,8 +96,6 @@ def df_orders(spark) -> DataFrame: "Item B", 1, 50.0, - 10.0, - 10.0, ), ( "Jane Smith", @@ -117,8 +112,6 @@ def df_orders(spark) -> DataFrame: "Item C", 3, 150.0, - 75.0, - 25.0, ), ] return spark.createDataFrame(orders_data, schema=order_enriched_schema) @@ -233,10 +226,10 @@ def test_aggregate_orders(spark, config, df_orders): assert df_out.count() == 2 - # total_value = sum(line_revenue): John 20+10=30, Jane 75 + # total_value = sum(item_total): John 50+50=100, Jane 150 expected_data = [ - ("John Doe", "USA", date(2023, 1, 1), 1, "Product 1", 1, "Category 1", 3, 30.0, 1), - ("Jane Smith", "UK", date(2023, 1, 2), 2, "Product 2", 2, "Category 2", 3, 75.0, 1), + ("John Doe", "USA", date(2023, 1, 1), 1, "Product 1", 1, "Category 1", 3, 100.0, 1), + ("Jane Smith", "UK", date(2023, 1, 2), 2, "Product 2", 2, "Category 2", 3, 150.0, 1), ] df_expected = spark.createDataFrame(expected_data, schema=order_agg_schema) @@ -271,21 +264,30 @@ def test_seed_sources_customer_updates_schema(spark, config): assert df.columns == [f.name for f in customer_schema] -def test_seed_sources_price_updates_schema(spark, config): +def test_seed_sources_name_updates_schema(spark, config): task = SeedSources(config) - df = task._build_price_updates("2024-01-01") - assert df.count() == _INCREMENTAL_PRICE_UPDATES + df = task._build_name_updates("2024-01-01") + assert df.count() == _INCREMENTAL_NAME_UPDATES assert df.columns == [f.name for f in product_schema] -def test_seed_sources_price_updates_deterministic_per_date(spark, config): - """Same seed_date must produce identical price updates (idempotent reruns).""" +def test_seed_sources_name_updates_deterministic_per_date(spark, config): + """Same seed_date must produce identical name updates (idempotent reruns).""" task = SeedSources(config) - a = {(r.product_id, r.unit_price) for r in task._build_price_updates("2024-01-05").collect()} - b = {(r.product_id, r.unit_price) for r in task._build_price_updates("2024-01-05").collect()} + a = {(r.product_id, r.name) for r in task._build_name_updates("2024-01-05").collect()} + b = {(r.product_id, r.name) for r in task._build_name_updates("2024-01-05").collect()} assert a == b +def test_seed_sources_name_updates_suffix_pattern(spark, config): + """Renamed products follow the 'Product .' pattern, k = cumulative rename count.""" + task = SeedSources(config) + # day 0 renames the first products for the first time → suffix .1 + rows = {r.product_id: r.name for r in task._build_name_updates("2024-01-01").collect()} + for pid, name in rows.items(): + assert name == f"Product {pid}.1" + + def test_seed_sources_incremental_ids_unique_across_days(spark, config): """Order IDs generated for consecutive days must not collide.""" task = SeedSources(config) diff --git a/tests/job1/unit_test_sdp.py b/tests/job1/unit_test_sdp.py index f3f58b0..487d789 100644 --- a/tests/job1/unit_test_sdp.py +++ b/tests/job1/unit_test_sdp.py @@ -54,8 +54,6 @@ def df_orders_enriched(spark) -> DataFrame: "Item A", 2, 50.0, - 20.0, - 10.0, ), ( "John Doe", @@ -72,8 +70,6 @@ def df_orders_enriched(spark) -> DataFrame: "Item B", 1, 50.0, - 10.0, - 10.0, ), ( "Jane Smith", @@ -90,8 +86,6 @@ def df_orders_enriched(spark) -> DataFrame: "Item C", 3, 150.0, - 75.0, - 25.0, ), ] return spark.createDataFrame(data, schema=order_enriched_schema) @@ -142,8 +136,6 @@ def test_enrich_order_columns(spark): "item_description", "item_quantity", "item_total", - "line_revenue", - "unit_price_at_sale", } df_customer = spark.createDataFrame([(10, "Alice", "US")], schema=customer_schema) @@ -169,8 +161,8 @@ def test_aggregate_orders_values(spark, df_orders_enriched): df_out = aggregate_orders(df_orders_enriched) expected_data = [ - ("John Doe", "USA", date(2023, 1, 1), 1, "Product 1", 1, "Category 1", 3, 30.0, 1), # line_revenue 20+10=30 - ("Jane Smith", "UK", date(2023, 1, 2), 2, "Product 2", 2, "Category 2", 3, 75.0, 1), # line_revenue 75 + ("John Doe", "USA", date(2023, 1, 1), 1, "Product 1", 1, "Category 1", 3, 100.0, 1), # item_total 50+50=100 + ("Jane Smith", "UK", date(2023, 1, 2), 2, "Product 2", 2, "Category 2", 3, 150.0, 1), # item_total 150 ] df_expected = spark.createDataFrame(expected_data, schema=order_agg_schema)