Skip to content

Latest commit

 

History

History
1372 lines (1066 loc) · 36.4 KB

File metadata and controls

1372 lines (1066 loc) · 36.4 KB

DBT in K2 Reference Data Platform - Complete Guide

Table of Contents

  1. Introduction
  2. What is DBT?
  3. Why DBT for This Project?
  4. DBT Architecture in Our Project
  5. Setting Up DBT Locally
  6. Understanding Our DBT Project Structure
  7. Bronze Layer: Sources
  8. Silver Layer: Transformations
  9. Gold Layer: Analytics
  10. Custom Macros
  11. Data Quality Testing
  12. Running DBT
  13. Debugging and Troubleshooting
  14. Best Practices
  15. Further Reading

Introduction

Welcome to the K2 Reference Data Platform! This guide explains how we use DBT (Data Build Tool) to transform raw cryptocurrency instrument data into analytics-ready tables. By the end of this guide, you'll understand:

  • Why we chose DBT for data transformations
  • How our Bronze → Silver → Gold medallion architecture works
  • How to run DBT transformations locally
  • How to write and test new DBT models
  • How our custom bitemporal SCD Type 2 logic works

Prerequisites: Basic SQL knowledge. No prior DBT experience required!


What is DBT?

The Basics

DBT stands for Data Build Tool. It's a transformation framework that lets you:

  1. Write SQL transformations as modular models
  2. Define dependencies between models (DAG - Directed Acyclic Graph)
  3. Test data quality with built-in and custom tests
  4. Document your data with auto-generated lineage graphs
  5. Deploy incrementally for efficiency

Think of DBT as "software engineering best practices for data transformations."

Official Resources

DBT vs Other Tools

Tool Use Case Our Choice
Spark High-volume streaming (1M+ msg/sec) Used in k2-market-data-platform
DBT Batch transformations (hourly/daily) Used here for reference data
Custom Python One-off scripts Avoid (reinventing the wheel)

Why DBT wins for reference data:

  • Reference data changes infrequently (~10x/day)
  • SQL-based: Accessible to analysts, not just engineers
  • Built-in data quality testing
  • Excellent for SCD Type 2 (Slowly Changing Dimensions)

Why DBT for This Project?

The Problem

We ingest raw instrument specifications from exchanges (Binance, Kraken) into a Bronze layer (raw JSON). We need to:

  1. Parse JSON into structured columns
  2. Track changes over time (SCD Type 2)
  3. Handle late corrections (bitemporal modeling)
  4. Normalize symbols across exchanges (symbology)
  5. Ensure data quality (no nulls, valid ranges, etc.)

The DBT Solution

DBT provides:

Declarative SQL: Define transformations in .sql files, not code ✅ Incremental Models: Process only new data (not full refreshes) ✅ Built-in Testing: unique, not_null, relationships, custom SQL tests ✅ SCD Type 2 Support: Via dbt-utils and custom macros ✅ Documentation: Auto-generated data lineage and column descriptions ✅ Version Control: SQL files in Git, reviewable PRs

Decision Context

See ADR-003: DBT vs Spark for the full rationale.

Key Quote:

"Reference data has ~10k instruments with ~10 changes/day. DBT can handle this on a laptop. Spark would be overkill and add unnecessary complexity."


DBT Architecture in Our Project

Medallion Architecture

We use a Bronze → Silver → Gold pattern:

┌────────────────────────────────────────────────────────────┐
│  BRONZE: Raw Event Log                                     │
│  ┌──────────────────────────────────────────────────────┐  │
│  │ bronze_instruments_binance (Iceberg)                 │  │
│  │ - Full API response JSON                             │  │
│  │ - Append-only (immutable)                            │  │
│  │ - 7-day retention                                    │  │
│  └──────────────────────────────────────────────────────┘  │
└────────────────────────────────────────────────────────────┘
                         │
                         │ DBT Source Definition
                         ▼
┌────────────────────────────────────────────────────────────┐
│  SILVER: Bitemporal Dimensions                             │
│  ┌──────────────────────────────────────────────────────┐  │
│  │ silver_instruments (SCD Type 2)                      │  │
│  │ - Parsed JSON → structured columns                   │  │
│  │ - valid_from, valid_to (business time)               │  │
│  │ - record_created_at (system time)                    │  │
│  │ - Data quality tests applied                         │  │
│  └──────────────────────────────────────────────────────┘  │
└────────────────────────────────────────────────────────────┘
                         │
                         │ DBT Transformation
                         ▼
┌────────────────────────────────────────────────────────────┐
│  GOLD: Analytics-Ready                                     │
│  ┌──────────────────────────────────────────────────────┐  │
│  │ gold_symbology_master                                │  │
│  │ - Canonical IDs (BTC-USD-SPOT)                       │  │
│  │ - Cross-exchange mappings                            │  │
│  │ - One row per instrument                             │  │
│  └──────────────────────────────────────────────────────┘  │
└────────────────────────────────────────────────────────────┘

DBT's Role

DBT does NOT:

  • ❌ Ingest data from exchanges (that's Python ingestion)
  • ❌ Expose APIs (that's FastAPI)
  • ❌ Store data (that's Iceberg/MinIO)

DBT DOES:

  • ✅ Read from Bronze Iceberg tables
  • ✅ Transform Bronze → Silver (parse JSON, SCD Type 2)
  • ✅ Transform Silver → Gold (symbology mapping)
  • ✅ Test data quality at each layer
  • ✅ Generate documentation and lineage

Setting Up DBT Locally

Prerequisites

  1. Project installed:

    make install-dev
  2. Docker services running:

    make docker-up
  3. Infrastructure initialized:

    make init-infra

Verify DBT Installation

# Check DBT version
dbt --version

# Expected output:
# Core:
#   - installed: 1.7.x
#   - latest:    1.7.x
# Plugins:
#   - duckdb: 1.7.x

DBT Project Location

cd dbt/  # DBT project root

# Key files:
ls -la
# dbt_project.yml       <- Project configuration
# profiles.yml          <- Connection settings
# models/               <- SQL transformations
# macros/               <- Custom SQL functions
# tests/                <- Data quality tests

Test DBT Connection

cd dbt/

# Test connection to DuckDB + Iceberg
dbt debug

# Expected output:
# Configuration:
#   profiles.yml file [OK found and valid]
#   dbt_project.yml file [OK found and valid]
#
# Connection:
#   adapter: duckdb
#   Connection test: [OK connection ok]

Troubleshooting: If dbt debug fails, see Debugging and Troubleshooting.


Understanding Our DBT Project Structure

Directory Layout

dbt/
├── dbt_project.yml              # Project configuration
├── profiles.yml                 # Connection profiles (dev, prod)
│
├── models/                      # SQL transformations
│   ├── bronze/
│   │   └── sources.yml          # Define Bronze Iceberg sources
│   ├── silver/
│   │   ├── silver_instruments.sql        # Main SCD Type 2 model
│   │   └── silver_instruments.yml        # Tests & documentation
│   └── gold/
│       ├── gold_symbology_master.sql     # Symbology mapping
│       └── gold_symbology_master.yml     # Tests & documentation
│
├── macros/                      # Reusable SQL functions
│   ├── bitemporal_scd2.sql      # Custom SCD Type 2 macro
│   ├── normalize_asset.sql      # XBT→BTC, USDT→USD
│   └── normalize_instrument_type.sql
│
├── tests/                       # Custom data quality tests
│   ├── test_no_temporal_overlaps.sql
│   └── test_every_instrument_has_current.sql
│
├── snapshots/                   # DBT snapshots (not used)
├── analyses/                    # Ad-hoc queries
└── seeds/                       # CSV data (small reference tables)

Key Configuration Files

dbt_project.yml

name: 'k2_refdata'
version: '1.0.0'
config-version: 2

profile: 'k2_refdata'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"

models:
  k2_refdata:
    bronze:
      +materialized: view
      +schema: bronze
    silver:
      +materialized: incremental
      +schema: silver
      +on_schema_change: append_new_columns  # Handle schema evolution
    gold:
      +materialized: table
      +schema: gold

Key Settings:

  • +materialized: incremental: Silver models process only new data
  • +on_schema_change: append_new_columns: Add new columns without breaking
  • +schema: silver: Separate schemas for each layer

profiles.yml

k2_refdata:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: ':memory:'  # In-memory for development speed
      extensions:
        - iceberg
        - httpfs
      settings:
        s3_endpoint: "{{ env_var('DBT_S3_ENDPOINT', 'http://localhost:9000') }}"
        s3_access_key_id: "{{ env_var('DBT_S3_ACCESS_KEY_ID', 'admin') }}"
        s3_secret_access_key: "{{ env_var('DBT_S3_SECRET_ACCESS_KEY', 'password') }}"
        s3_use_ssl: false

    prod:
      type: duckdb
      path: '/data/refdata.duckdb'  # Persistent for production
      extensions:
        - iceberg
        - httpfs
      settings:
        s3_endpoint: "{{ env_var('DBT_S3_ENDPOINT') }}"
        s3_access_key_id: "{{ env_var('DBT_S3_ACCESS_KEY_ID') }}"
        s3_secret_access_key: "{{ env_var('DBT_S3_SECRET_ACCESS_KEY') }}"

Key Points:

  • Dev: Uses :memory: for fast iteration
  • Prod: Uses persistent file for replay capability
  • Iceberg extension: Reads from S3/MinIO via iceberg_scan('s3://...')
  • Environment variables: Configuration via .env file

Official Docs: https://docs.getdbt.com/reference/dbt_project.yml


Bronze Layer: Sources

What are Sources?

Sources in DBT define where your raw data lives. They're NOT transformations—they're pointers to existing tables.

In our case, Bronze sources point to Iceberg tables written by the ingestion layer.

Defining Bronze Sources

File: dbt/models/bronze/sources.yml

version: 2

sources:
  - name: bronze
    description: Raw instrument data from exchange APIs (Bronze layer)
    database: refdata
    schema: refdata

    tables:
      - name: bronze_instruments_binance
        description: |
          Raw Binance /exchangeInfo responses.
          Stored as JSON for full fidelity.
          Partitioned by days(ingestion_timestamp).

        meta:
          iceberg_location: 's3://refdata-warehouse/bronze/instruments/binance'

        columns:
          - name: ingestion_id
            description: MD5 hash for idempotency (deterministic)
            tests:
              - unique
              - not_null

          - name: ingestion_timestamp
            description: System time when ingestion occurred
            tests:
              - not_null

          - name: api_response_raw
            description: Complete JSON response from Binance API
            tests:
              - not_null

          - name: response_size_bytes
            description: Size of response in bytes (for monitoring)

      - name: bronze_instruments_kraken
        description: |
          Raw Kraken /AssetPairs responses.
          Stored as JSON for full fidelity.

        meta:
          iceberg_location: 's3://refdata-warehouse/bronze/instruments/kraken'

        columns:
          - name: ingestion_id
            tests:
              - unique
              - not_null
          - name: api_response_raw
            tests:
              - not_null

Querying Bronze Sources in DBT

Use the source() function to reference Bronze tables:

-- In a DBT model:
SELECT *
FROM {{ source('bronze', 'bronze_instruments_binance') }}

DBT compiles this to:

SELECT *
FROM refdata.bronze_instruments_binance

Testing Bronze Sources

Run source tests:

dbt test --select source:bronze

This validates:

  • ingestion_id is unique (no duplicates)
  • Required fields are not null

Official Docs: https://docs.getdbt.com/docs/build/sources


Silver Layer: Transformations

Goal: Bronze → Silver

Transform raw JSON into structured, bitemporal dimensions:

Input (Bronze):

{
  "symbol": "BTCUSDT",
  "baseAsset": "BTC",
  "quoteAsset": "USDT",
  "filters": [
    {"filterType": "PRICE_FILTER", "tickSize": "0.01"},
    {"filterType": "LOT_SIZE", "stepSize": "0.00001"}
  ]
}

Output (Silver):

| exchange | symbol   | valid_from          | valid_to | tick_size | lot_size  |
|----------|----------|---------------------|----------|-----------|-----------|
| binance  | BTCUSDT  | 2024-01-10 00:00:00 | NULL     | 0.01      | 0.00001   |

Silver Instruments Model

File: dbt/models/silver/silver_instruments.sql

{{
    config(
        materialized='incremental',
        unique_key='instrument_sk',
        on_schema_change='append_new_columns',
        partition_by='exchange, months(valid_from)'
    )
}}

WITH bronze_binance AS (
    SELECT
        ingestion_id,
        ingestion_timestamp,
        api_response_raw,
        'binance' AS exchange
    FROM {{ source('bronze', 'bronze_instruments_binance') }}

    {% if is_incremental() %}
    -- Only process new records
    WHERE ingestion_timestamp > (
        SELECT MAX(record_created_at) FROM {{ this }}
    )
    {% endif %}
),

bronze_kraken AS (
    SELECT
        ingestion_id,
        ingestion_timestamp,
        api_response_raw,
        'kraken' AS exchange
    FROM {{ source('bronze', 'bronze_instruments_kraken') }}

    {% if is_incremental() %}
    WHERE ingestion_timestamp > (
        SELECT MAX(record_created_at) FROM {{ this }}
    )
    {% endif %}
),

bronze_all AS (
    SELECT * FROM bronze_binance
    UNION ALL
    SELECT * FROM bronze_kraken
),

-- Parse JSON for Binance
parsed_binance AS (
    SELECT
        exchange,
        ingestion_timestamp,
        JSON_EXTRACT_SCALAR(symbol_json, '$.symbol') AS symbol,
        JSON_EXTRACT_SCALAR(symbol_json, '$.baseAsset') AS base_asset,
        JSON_EXTRACT_SCALAR(symbol_json, '$.quoteAsset') AS quote_asset,
        JSON_EXTRACT_SCALAR(symbol_json, '$.status') AS status,

        -- Extract tick_size from filters array
        CAST(
            JSON_EXTRACT_SCALAR(
                JSON_EXTRACT(symbol_json, '$.filters[0]'),
                '$.tickSize'
            ) AS DECIMAL(38,18)
        ) AS tick_size,

        -- Extract lot_size from filters array
        CAST(
            JSON_EXTRACT_SCALAR(
                JSON_EXTRACT(symbol_json, '$.filters[1]'),
                '$.stepSize'
            ) AS DECIMAL(38,18)
        ) AS lot_size,

        -- Use current timestamp as valid_from (simplification)
        ingestion_timestamp AS valid_from,
        NULL AS valid_to,
        ingestion_timestamp AS record_created_at

    FROM bronze_all,
    LATERAL (
        SELECT json_array_elements(
            JSON_EXTRACT(api_response_raw, '$.symbols')
        ) AS symbol_json
    )
    WHERE exchange = 'binance'
),

-- Generate surrogate key (SCD Type 2 requirement)
with_surrogate_key AS (
    SELECT
        MD5(exchange || symbol || CAST(valid_from AS STRING)) AS instrument_sk,
        *
    FROM parsed_binance
)

SELECT * FROM with_surrogate_key

Key Concepts:

  1. Incremental Materialization:

    {% if is_incremental() %}
    WHERE ingestion_timestamp > (SELECT MAX(record_created_at) FROM {{ this }})
    {% endif %}
    • On first run: Processes ALL Bronze data
    • On subsequent runs: Processes ONLY new data since last run
    • {{ this }} refers to the current model (Silver table)
  2. JSON Parsing:

    JSON_EXTRACT_SCALAR(symbol_json, '$.symbol')
    • DuckDB function to extract values from JSON
    • $.symbol is JSONPath notation
  3. Surrogate Key:

    MD5(exchange || symbol || valid_from) AS instrument_sk
    • Unique identifier for each version of an instrument
    • Required for SCD Type 2 (multiple versions of same instrument)
  4. Configuration:

    • materialized='incremental': Process only new data
    • unique_key='instrument_sk': Upsert on this key
    • on_schema_change='append_new_columns': Handle new fields gracefully
    • partition_by='exchange, months(valid_from)': Iceberg partitioning

Official Docs: https://docs.getdbt.com/docs/build/incremental-models

Running Silver Model

# Run Silver model
dbt run --select silver_instruments

# Check compiled SQL (what DBT actually runs)
cat target/compiled/k2_refdata/models/silver/silver_instruments.sql

# View run results
cat target/run_results.json | jq '.results[0]'

Gold Layer: Analytics

Goal: Silver → Gold

Create analytics-ready tables with:

  • Canonical identifiers (BTC-USD-SPOT)
  • Cross-exchange mappings
  • Normalized asset names (XBT → BTC)

Gold Symbology Master Model

File: dbt/models/gold/gold_symbology_master.sql

{{
    config(
        materialized='table',
        partition_by='truncate(base_asset, 1)'
    )
}}

WITH current_instruments AS (
    -- Get only current (latest) version of each instrument
    SELECT
        exchange,
        symbol,
        base_asset,
        quote_asset,
        instrument_type
    FROM {{ ref('silver_instruments') }}
    WHERE valid_to IS NULL  -- Current records only
),

normalized_instruments AS (
    SELECT
        exchange,
        symbol,
        {{ normalize_asset('base_asset') }} AS base_asset_norm,
        {{ normalize_asset('quote_asset') }} AS quote_asset_norm,
        {{ normalize_instrument_type('instrument_type') }} AS instrument_class
    FROM current_instruments
),

canonical_ids AS (
    -- Generate canonical IDs: BTC-USD-SPOT
    SELECT DISTINCT
        base_asset_norm || '-' || quote_asset_norm || '-' || instrument_class AS canonical_id,
        base_asset_norm,
        quote_asset_norm,
        instrument_class
    FROM normalized_instruments
),

-- Pivot exchange symbols
with_mappings AS (
    SELECT
        c.canonical_id,
        c.base_asset_norm AS base_asset,
        c.quote_asset_norm AS quote_asset,
        c.instrument_class,

        -- Binance symbol
        MAX(CASE WHEN n.exchange = 'binance' THEN n.symbol END) AS binance_symbol,

        -- Kraken symbol
        MAX(CASE WHEN n.exchange = 'kraken' THEN n.symbol END) AS kraken_symbol,

        CURRENT_TIMESTAMP AS created_at,
        TRUE AS is_active

    FROM canonical_ids c
    LEFT JOIN normalized_instruments n
        ON c.base_asset_norm = n.base_asset_norm
        AND c.quote_asset_norm = n.quote_asset_norm
        AND c.instrument_class = n.instrument_class
    GROUP BY 1, 2, 3, 4
)

SELECT * FROM with_mappings

Key Concepts:

  1. Model Reference:

    FROM {{ ref('silver_instruments') }}
    • ref() creates dependency: Gold depends on Silver
    • DBT ensures Silver runs before Gold
    • Enables automatic lineage tracking
  2. Custom Macros:

    {{ normalize_asset('base_asset') }}
    • Calls macro defined in dbt/macros/normalize_asset.sql
    • Transforms XBT → BTC, USDT → USD
    • Reusable across models
  3. Pivoting:

    MAX(CASE WHEN exchange = 'binance' THEN symbol END) AS binance_symbol
    • Converts rows to columns
    • One row per canonical ID, with exchange symbols as columns

Official Docs: https://docs.getdbt.com/reference/dbt-jinja-functions/ref


Custom Macros

What are Macros?

Macros are reusable SQL functions in Jinja templating language. Think of them as "SQL functions on steroids."

Normalize Asset Macro

File: dbt/macros/normalize_asset.sql

{% macro normalize_asset(column_name) %}
CASE
    -- Kraken uses XBT instead of BTC (ISO 4217)
    WHEN {{ column_name }} IN ('XBT', 'XXBT') THEN 'BTC'
    WHEN {{ column_name }} LIKE 'X%' THEN SUBSTR({{ column_name }}, 2)  -- Remove X prefix

    -- Kraken uses Z prefix for fiat
    WHEN {{ column_name }} LIKE 'Z%' THEN SUBSTR({{ column_name }}, 2)  -- Remove Z prefix

    -- Stablecoin normalization
    WHEN {{ column_name }} IN ('USDT', 'USDC', 'TUSD', 'BUSD') THEN 'USD'

    -- Default: pass through
    ELSE {{ column_name }}
END
{% endmacro %}

Usage:

SELECT
    {{ normalize_asset('base_asset') }} AS base_asset_normalized
FROM silver_instruments

Compiles to:

SELECT
    CASE
        WHEN base_asset IN ('XBT', 'XXBT') THEN 'BTC'
        WHEN base_asset LIKE 'X%' THEN SUBSTR(base_asset, 2)
        WHEN base_asset LIKE 'Z%' THEN SUBSTR(base_asset, 2)
        WHEN base_asset IN ('USDT', 'USDC', 'TUSD', 'BUSD') THEN 'USD'
        ELSE base_asset
    END AS base_asset_normalized
FROM silver_instruments

Bitemporal SCD Type 2 Macro (Advanced)

File: dbt/macros/bitemporal_scd2.sql

{% macro bitemporal_scd2(
    source_relation,
    target_relation,
    unique_key,
    check_cols,
    valid_from_col='valid_from'
) %}

-- Step 1: Identify changed records
WITH source_data AS (
    SELECT
        {{ unique_key }} AS natural_key,
        {{ valid_from_col }} AS valid_from,
        {% for col in check_cols %}
        {{ col }},
        {% endfor %}
        CURRENT_TIMESTAMP AS record_created_at
    FROM {{ source_relation }}
),

existing_current AS (
    SELECT *
    FROM {{ target_relation }}
    WHERE valid_to IS NULL  -- Only current records
),

changed_records AS (
    SELECT
        s.*,
        e.instrument_sk AS existing_sk
    FROM source_data s
    LEFT JOIN existing_current e
        ON s.natural_key = e.{{ unique_key }}
    WHERE
        -- New record (not in target)
        e.instrument_sk IS NULL
        -- Or specifications changed
        OR (
            {% for col in check_cols %}
            s.{{ col }} != e.{{ col }}
            {% if not loop.last %}OR{% endif %}
            {% endfor %}
        )
)

-- Step 2: Close old records (set valid_to)
UPDATE {{ target_relation }}
SET valid_to = changed.valid_from - INTERVAL '1 second'
FROM changed_records changed
WHERE {{ target_relation }}.instrument_sk = changed.existing_sk
  AND {{ target_relation }}.valid_to IS NULL;

-- Step 3: Insert new records
INSERT INTO {{ target_relation }}
SELECT
    MD5({{ unique_key }} || {{ valid_from_col }}) AS instrument_sk,
    *,
    NULL AS valid_to,
    'spec_update' AS change_reason,
    'dbt_transformation' AS changed_by
FROM changed_records;

{% endmacro %}

Usage:

{{ bitemporal_scd2(
    source_relation=ref('bronze_instruments'),
    target_relation=ref('silver_instruments'),
    unique_key='exchange || symbol',
    check_cols=['tick_size', 'lot_size', 'min_notional']
) }}

What It Does:

  1. Compares source (new data) with target (existing Silver table)
  2. Identifies changed specifications (tick_size, lot_size, etc.)
  3. Closes old records by setting valid_to
  4. Inserts new records with valid_to = NULL (current)

Official Docs: https://docs.getdbt.com/docs/build/jinja-macros


Data Quality Testing

Built-in Tests

DBT provides standard tests out-of-the-box:

File: dbt/models/silver/silver_instruments.yml

version: 2

models:
  - name: silver_instruments
    description: Bitemporal instrument specifications (SCD Type 2)

    columns:
      - name: instrument_sk
        description: Surrogate key (MD5 hash)
        tests:
          - unique
          - not_null

      - name: exchange
        description: Exchange identifier
        tests:
          - not_null
          - accepted_values:
              values: ['binance', 'kraken', 'bybit']

      - name: tick_size
        description: Minimum price increment
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              inclusive: false  # Must be > 0

      - name: valid_from
        description: Business time effective start
        tests:
          - not_null

      - name: valid_to
        description: Business time effective end
        tests:
          - dbt_utils.expression_is_true:
              expression: "valid_to IS NULL OR valid_to > valid_from"

Running Tests:

# Run all tests
dbt test

# Run tests for specific model
dbt test --select silver_instruments

# Run specific test type
dbt test --select test_type:unique

Custom Tests

For complex business logic, write custom SQL tests:

File: dbt/tests/test_no_temporal_overlaps.sql

-- Test: No two records for same instrument should have overlapping validity periods
WITH overlaps AS (
    SELECT
        a.exchange,
        a.symbol,
        a.valid_from AS a_start,
        a.valid_to AS a_end,
        b.valid_from AS b_start,
        b.valid_to AS b_end
    FROM {{ ref('silver_instruments') }} a
    JOIN {{ ref('silver_instruments') }} b
        ON a.exchange = b.exchange
        AND a.symbol = b.symbol
        AND a.instrument_sk != b.instrument_sk  -- Different records
    WHERE
        -- Check for overlapping periods
        a.valid_from < COALESCE(b.valid_to, TIMESTAMP '9999-12-31')
        AND COALESCE(a.valid_to, TIMESTAMP '9999-12-31') > b.valid_from
)

SELECT * FROM overlaps;

-- If this query returns ANY rows, test FAILS

Running Custom Tests:

dbt test --select test_name:test_no_temporal_overlaps

Test Output

$ dbt test

Running with dbt=1.7.0
Found 3 models, 12 tests, 0 snapshots, 0 analyses, 0 macros, 2 sources

Concurrency: 4 threads (target='dev')

1 of 12 START test unique_silver_instruments_instrument_sk .............. [RUN]
1 of 12 PASS unique_silver_instruments_instrument_sk .................... [PASS in 0.21s]
2 of 12 START test not_null_silver_instruments_tick_size ................ [RUN]
2 of 12 PASS not_null_silver_instruments_tick_size ...................... [PASS in 0.18s]
...
12 of 12 START test test_no_temporal_overlaps ........................... [RUN]
12 of 12 PASS test_no_temporal_overlaps ................................. [PASS in 0.45s]

Finished running 12 tests in 3.12s.

Completed successfully

Done. PASS=12 WARN=0 ERROR=0 SKIP=0 TOTAL=12

Official Docs: https://docs.getdbt.com/docs/build/data-tests


Running DBT

Basic Commands

# Navigate to DBT project
cd dbt/

# Run all models
dbt run

# Run specific model
dbt run --select silver_instruments

# Run model and all downstream models
dbt run --select silver_instruments+

# Run all models in Silver layer
dbt run --select silver.*

# Run all tests
dbt test

# Test specific model
dbt test --select silver_instruments

# Compile SQL without running
dbt compile

# Generate documentation
dbt docs generate
dbt docs serve  # Opens browser on http://localhost:8080

Useful Flags

Flag Purpose Example
--select Run specific models dbt run --select silver_instruments
--exclude Exclude models dbt run --exclude gold.*
--full-refresh Force full rebuild (ignore incremental) dbt run --full-refresh
--threads Parallel execution dbt run --threads 4
--profiles-dir Custom profiles location dbt run --profiles-dir ./
--target Target environment dbt run --target prod

DBT Graph Operators

# Model and all downstream dependencies
dbt run --select silver_instruments+

# Model and all upstream dependencies
dbt run --select +silver_instruments

# Model and everything (up and down)
dbt run --select +silver_instruments+

# All models in a directory
dbt run --select silver.*

# Models with specific tag
dbt run --select tag:daily

Official Docs: https://docs.getdbt.com/reference/node-selection/syntax

Makefile Shortcuts

We've created convenient shortcuts in the Makefile:

# From project root
make dbt-run          # Run all models
make dbt-test         # Run all tests
make dbt-docs         # Generate and serve documentation
make dbt-clean        # Clean build artifacts

Debugging and Troubleshooting

Common Issues

1. "Iceberg table not found"

Error:

Database Error in model silver_instruments
  Table 'refdata.bronze_instruments_binance' not found

Solution:

# Initialize Iceberg catalog
python scripts/init_iceberg_catalog.py

# Verify tables exist
python -c "
from pyiceberg.catalog import load_catalog
catalog = load_catalog('refdata', uri='http://localhost:8181')
print(catalog.list_tables('refdata'))
"

2. "S3 connection failed"

Error:

IO Error: Unable to connect to S3 endpoint

Solution:

# Check MinIO is running
docker ps | grep minio

# Check environment variables
echo $DBT_S3_ENDPOINT
echo $DBT_S3_ACCESS_KEY_ID

# Test S3 connection manually
aws --endpoint-url http://localhost:9000 s3 ls s3://refdata-warehouse

3. "Incremental model not updating"

Problem: Running dbt run but Silver table not updating with new data

Solution:

# Force full refresh (rebuilds from scratch)
dbt run --select silver_instruments --full-refresh

# Or delete the table and re-run
dbt run-operation drop_relation --args "{relation: ref('silver_instruments')}"
dbt run --select silver_instruments

4. "Test failing unexpectedly"

Error:

FAIL 1 test_unique_silver_instruments_instrument_sk

Debug:

# View compiled test SQL
cat target/compiled/k2_refdata/models/silver/silver_instruments.yml/unique_silver_instruments_instrument_sk.sql

# Run test SQL directly in DuckDB
duckdb -c "$(cat target/compiled/.../unique_...sql)"

# Check for duplicates
duckdb -c "
SELECT instrument_sk, COUNT(*)
FROM refdata.silver_instruments
GROUP BY instrument_sk
HAVING COUNT(*) > 1
"

DBT Debug Mode

# Run with verbose logging
dbt run --select silver_instruments --debug

# This shows:
# - SQL compilation steps
# - Connection details
# - Execution time per step
# - Row counts

Viewing Compiled SQL

DBT compiles Jinja templates into plain SQL. View what actually runs:

# Compile without running
dbt compile --select silver_instruments

# View compiled SQL
cat target/compiled/k2_refdata/models/silver/silver_instruments.sql

# This is the ACTUAL SQL that DuckDB executes
# All {{ }} Jinja is replaced with real values

Logs Location

# DBT logs
cat dbt/logs/dbt.log

# Run results (JSON)
cat dbt/target/run_results.json | jq '.'

# Manifest (full DAG)
cat dbt/target/manifest.json | jq '.nodes | keys'

Official Docs: https://docs.getdbt.com/reference/commands/debug


Best Practices

1. Model Organization

DO: Organize by layer (bronze, silver, gold)

models/
  bronze/
  silver/
  gold/

DON'T: Mix layers or create deep hierarchies

models/
  all_models/
    some_model.sql

2. Naming Conventions

DO: Use descriptive, consistent names

  • Sources: bronze_*
  • Dimensions: silver_*
  • Facts: silver_fact_*
  • Marts: gold_*

DON'T: Use abbreviations or unclear names

  • tmp_tbl1.sql
  • data.sql

3. Documentation

DO: Document models and columns

models:
  - name: silver_instruments
    description: |
      Bitemporal instrument specifications.
      Tracks changes over time using SCD Type 2.
    columns:
      - name: tick_size
        description: Minimum price increment in quote currency

DON'T: Leave models undocumented

4. Testing

DO: Test critical business logic

  • Unique keys
  • Not null for required fields
  • Valid ranges
  • Referential integrity

DON'T: Over-test trivial things

  • Don't test every column is not null
  • Don't duplicate tests

5. Incremental Models

DO: Use incremental for large tables

{{ config(materialized='incremental') }}

{% if is_incremental() %}
WHERE event_time > (SELECT MAX(event_time) FROM {{ this }})
{% endif %}

DON'T: Use incremental for small tables (<1M rows)

  • Adds complexity
  • Full refresh is often faster

6. Macros

DO: Extract repeated logic into macros

{{ normalize_asset('base_asset') }}

DON'T: Create macros for one-time use

  • Adds abstraction overhead
  • Harder to debug

7. Version Control

DO: Commit all DBT files

  • dbt_project.yml
  • profiles.yml (without secrets!)
  • All models, tests, macros
  • packages.yml if using packages

DON'T: Commit generated files

  • target/
  • dbt_packages/
  • logs/

8. Refactoring

DO: Refactor incrementally

  1. Create new model alongside old
  2. Validate results match
  3. Switch downstream dependencies
  4. Deprecate old model

DON'T: Refactor in-place without validation

  • Risk breaking downstream models
  • No rollback path

Official Best Practices: https://docs.getdbt.com/guides/best-practices


Further Reading

Official DBT Resources

Project-Specific Resources

Recommended Reading

  1. "Analytics Engineering with dbt" by Claire Carroll (dbt blog)
  2. "The dbt Viewpoint" - https://docs.getdbt.com/docs/about/viewpoint
  3. "Kimball Dimensional Modeling" (for understanding SCD Type 2)
  4. "Data Build Tool Best Practices" - https://docs.getdbt.com/guides/best-practices

Next Steps

  1. ✅ Complete this guide
  2. ⏭️ Read DBT-QUICKSTART.md for common commands
  3. ⏭️ Try the hands-on exercises (coming soon!)
  4. ⏭️ Review our actual models in dbt/models/
  5. ⏭️ Ask questions in #k2-refdata-platform Slack

Questions?

Stuck? Check:

  1. Debugging and Troubleshooting
  2. DBT Discourse
  3. Team Slack: #k2-refdata-platform
  4. Pair with a team member!

Found a bug in this guide? Open a PR or Slack @data-engineering


Last Updated: 2026-01-23 Maintainer: K2 Data Engineering Team Version: 1.0.0