- Introduction
- What is DBT?
- Why DBT for This Project?
- DBT Architecture in Our Project
- Setting Up DBT Locally
- Understanding Our DBT Project Structure
- Bronze Layer: Sources
- Silver Layer: Transformations
- Gold Layer: Analytics
- Custom Macros
- Data Quality Testing
- Running DBT
- Debugging and Troubleshooting
- Best Practices
- Further Reading
Welcome to the K2 Reference Data Platform! This guide explains how we use DBT (Data Build Tool) to transform raw cryptocurrency instrument data into analytics-ready tables. By the end of this guide, you'll understand:
- Why we chose DBT for data transformations
- How our Bronze → Silver → Gold medallion architecture works
- How to run DBT transformations locally
- How to write and test new DBT models
- How our custom bitemporal SCD Type 2 logic works
Prerequisites: Basic SQL knowledge. No prior DBT experience required!
DBT stands for Data Build Tool. It's a transformation framework that lets you:
- Write SQL transformations as modular models
- Define dependencies between models (DAG - Directed Acyclic Graph)
- Test data quality with built-in and custom tests
- Document your data with auto-generated lineage graphs
- Deploy incrementally for efficiency
Think of DBT as "software engineering best practices for data transformations."
- Official Docs: https://docs.getdbt.com/
- DBT Learn: https://courses.getdbt.com/
- DBT Discourse: https://discourse.getdbt.com/
| Tool | Use Case | Our Choice |
|---|---|---|
| Spark | High-volume streaming (1M+ msg/sec) | Used in k2-market-data-platform |
| DBT | Batch transformations (hourly/daily) | Used here for reference data |
| Custom Python | One-off scripts | Avoid (reinventing the wheel) |
Why DBT wins for reference data:
- Reference data changes infrequently (~10x/day)
- SQL-based: Accessible to analysts, not just engineers
- Built-in data quality testing
- Excellent for SCD Type 2 (Slowly Changing Dimensions)
We ingest raw instrument specifications from exchanges (Binance, Kraken) into a Bronze layer (raw JSON). We need to:
- Parse JSON into structured columns
- Track changes over time (SCD Type 2)
- Handle late corrections (bitemporal modeling)
- Normalize symbols across exchanges (symbology)
- Ensure data quality (no nulls, valid ranges, etc.)
DBT provides:
✅ Declarative SQL: Define transformations in .sql files, not code
✅ Incremental Models: Process only new data (not full refreshes)
✅ Built-in Testing: unique, not_null, relationships, custom SQL tests
✅ SCD Type 2 Support: Via dbt-utils and custom macros
✅ Documentation: Auto-generated data lineage and column descriptions
✅ Version Control: SQL files in Git, reviewable PRs
See ADR-003: DBT vs Spark for the full rationale.
Key Quote:
"Reference data has ~10k instruments with ~10 changes/day. DBT can handle this on a laptop. Spark would be overkill and add unnecessary complexity."
We use a Bronze → Silver → Gold pattern:
┌────────────────────────────────────────────────────────────┐
│ BRONZE: Raw Event Log │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ bronze_instruments_binance (Iceberg) │ │
│ │ - Full API response JSON │ │
│ │ - Append-only (immutable) │ │
│ │ - 7-day retention │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
│
│ DBT Source Definition
▼
┌────────────────────────────────────────────────────────────┐
│ SILVER: Bitemporal Dimensions │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ silver_instruments (SCD Type 2) │ │
│ │ - Parsed JSON → structured columns │ │
│ │ - valid_from, valid_to (business time) │ │
│ │ - record_created_at (system time) │ │
│ │ - Data quality tests applied │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
│
│ DBT Transformation
▼
┌────────────────────────────────────────────────────────────┐
│ GOLD: Analytics-Ready │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ gold_symbology_master │ │
│ │ - Canonical IDs (BTC-USD-SPOT) │ │
│ │ - Cross-exchange mappings │ │
│ │ - One row per instrument │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
DBT does NOT:
- ❌ Ingest data from exchanges (that's Python ingestion)
- ❌ Expose APIs (that's FastAPI)
- ❌ Store data (that's Iceberg/MinIO)
DBT DOES:
- ✅ Read from Bronze Iceberg tables
- ✅ Transform Bronze → Silver (parse JSON, SCD Type 2)
- ✅ Transform Silver → Gold (symbology mapping)
- ✅ Test data quality at each layer
- ✅ Generate documentation and lineage
-
Project installed:
make install-dev
-
Docker services running:
make docker-up
-
Infrastructure initialized:
make init-infra
# Check DBT version
dbt --version
# Expected output:
# Core:
# - installed: 1.7.x
# - latest: 1.7.x
# Plugins:
# - duckdb: 1.7.xcd dbt/ # DBT project root
# Key files:
ls -la
# dbt_project.yml <- Project configuration
# profiles.yml <- Connection settings
# models/ <- SQL transformations
# macros/ <- Custom SQL functions
# tests/ <- Data quality testscd dbt/
# Test connection to DuckDB + Iceberg
dbt debug
# Expected output:
# Configuration:
# profiles.yml file [OK found and valid]
# dbt_project.yml file [OK found and valid]
#
# Connection:
# adapter: duckdb
# Connection test: [OK connection ok]Troubleshooting: If dbt debug fails, see Debugging and Troubleshooting.
dbt/
├── dbt_project.yml # Project configuration
├── profiles.yml # Connection profiles (dev, prod)
│
├── models/ # SQL transformations
│ ├── bronze/
│ │ └── sources.yml # Define Bronze Iceberg sources
│ ├── silver/
│ │ ├── silver_instruments.sql # Main SCD Type 2 model
│ │ └── silver_instruments.yml # Tests & documentation
│ └── gold/
│ ├── gold_symbology_master.sql # Symbology mapping
│ └── gold_symbology_master.yml # Tests & documentation
│
├── macros/ # Reusable SQL functions
│ ├── bitemporal_scd2.sql # Custom SCD Type 2 macro
│ ├── normalize_asset.sql # XBT→BTC, USDT→USD
│ └── normalize_instrument_type.sql
│
├── tests/ # Custom data quality tests
│ ├── test_no_temporal_overlaps.sql
│ └── test_every_instrument_has_current.sql
│
├── snapshots/ # DBT snapshots (not used)
├── analyses/ # Ad-hoc queries
└── seeds/ # CSV data (small reference tables)
name: 'k2_refdata'
version: '1.0.0'
config-version: 2
profile: 'k2_refdata'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
k2_refdata:
bronze:
+materialized: view
+schema: bronze
silver:
+materialized: incremental
+schema: silver
+on_schema_change: append_new_columns # Handle schema evolution
gold:
+materialized: table
+schema: goldKey Settings:
+materialized: incremental: Silver models process only new data+on_schema_change: append_new_columns: Add new columns without breaking+schema: silver: Separate schemas for each layer
k2_refdata:
target: dev
outputs:
dev:
type: duckdb
path: ':memory:' # In-memory for development speed
extensions:
- iceberg
- httpfs
settings:
s3_endpoint: "{{ env_var('DBT_S3_ENDPOINT', 'http://localhost:9000') }}"
s3_access_key_id: "{{ env_var('DBT_S3_ACCESS_KEY_ID', 'admin') }}"
s3_secret_access_key: "{{ env_var('DBT_S3_SECRET_ACCESS_KEY', 'password') }}"
s3_use_ssl: false
prod:
type: duckdb
path: '/data/refdata.duckdb' # Persistent for production
extensions:
- iceberg
- httpfs
settings:
s3_endpoint: "{{ env_var('DBT_S3_ENDPOINT') }}"
s3_access_key_id: "{{ env_var('DBT_S3_ACCESS_KEY_ID') }}"
s3_secret_access_key: "{{ env_var('DBT_S3_SECRET_ACCESS_KEY') }}"Key Points:
- Dev: Uses
:memory:for fast iteration - Prod: Uses persistent file for replay capability
- Iceberg extension: Reads from S3/MinIO via
iceberg_scan('s3://...') - Environment variables: Configuration via
.envfile
Official Docs: https://docs.getdbt.com/reference/dbt_project.yml
Sources in DBT define where your raw data lives. They're NOT transformations—they're pointers to existing tables.
In our case, Bronze sources point to Iceberg tables written by the ingestion layer.
File: dbt/models/bronze/sources.yml
version: 2
sources:
- name: bronze
description: Raw instrument data from exchange APIs (Bronze layer)
database: refdata
schema: refdata
tables:
- name: bronze_instruments_binance
description: |
Raw Binance /exchangeInfo responses.
Stored as JSON for full fidelity.
Partitioned by days(ingestion_timestamp).
meta:
iceberg_location: 's3://refdata-warehouse/bronze/instruments/binance'
columns:
- name: ingestion_id
description: MD5 hash for idempotency (deterministic)
tests:
- unique
- not_null
- name: ingestion_timestamp
description: System time when ingestion occurred
tests:
- not_null
- name: api_response_raw
description: Complete JSON response from Binance API
tests:
- not_null
- name: response_size_bytes
description: Size of response in bytes (for monitoring)
- name: bronze_instruments_kraken
description: |
Raw Kraken /AssetPairs responses.
Stored as JSON for full fidelity.
meta:
iceberg_location: 's3://refdata-warehouse/bronze/instruments/kraken'
columns:
- name: ingestion_id
tests:
- unique
- not_null
- name: api_response_raw
tests:
- not_nullUse the source() function to reference Bronze tables:
-- In a DBT model:
SELECT *
FROM {{ source('bronze', 'bronze_instruments_binance') }}DBT compiles this to:
SELECT *
FROM refdata.bronze_instruments_binanceRun source tests:
dbt test --select source:bronzeThis validates:
ingestion_idis unique (no duplicates)- Required fields are not null
Official Docs: https://docs.getdbt.com/docs/build/sources
Transform raw JSON into structured, bitemporal dimensions:
Input (Bronze):
{
"symbol": "BTCUSDT",
"baseAsset": "BTC",
"quoteAsset": "USDT",
"filters": [
{"filterType": "PRICE_FILTER", "tickSize": "0.01"},
{"filterType": "LOT_SIZE", "stepSize": "0.00001"}
]
}Output (Silver):
| exchange | symbol | valid_from | valid_to | tick_size | lot_size |
|----------|----------|---------------------|----------|-----------|-----------|
| binance | BTCUSDT | 2024-01-10 00:00:00 | NULL | 0.01 | 0.00001 |
File: dbt/models/silver/silver_instruments.sql
{{
config(
materialized='incremental',
unique_key='instrument_sk',
on_schema_change='append_new_columns',
partition_by='exchange, months(valid_from)'
)
}}
WITH bronze_binance AS (
SELECT
ingestion_id,
ingestion_timestamp,
api_response_raw,
'binance' AS exchange
FROM {{ source('bronze', 'bronze_instruments_binance') }}
{% if is_incremental() %}
-- Only process new records
WHERE ingestion_timestamp > (
SELECT MAX(record_created_at) FROM {{ this }}
)
{% endif %}
),
bronze_kraken AS (
SELECT
ingestion_id,
ingestion_timestamp,
api_response_raw,
'kraken' AS exchange
FROM {{ source('bronze', 'bronze_instruments_kraken') }}
{% if is_incremental() %}
WHERE ingestion_timestamp > (
SELECT MAX(record_created_at) FROM {{ this }}
)
{% endif %}
),
bronze_all AS (
SELECT * FROM bronze_binance
UNION ALL
SELECT * FROM bronze_kraken
),
-- Parse JSON for Binance
parsed_binance AS (
SELECT
exchange,
ingestion_timestamp,
JSON_EXTRACT_SCALAR(symbol_json, '$.symbol') AS symbol,
JSON_EXTRACT_SCALAR(symbol_json, '$.baseAsset') AS base_asset,
JSON_EXTRACT_SCALAR(symbol_json, '$.quoteAsset') AS quote_asset,
JSON_EXTRACT_SCALAR(symbol_json, '$.status') AS status,
-- Extract tick_size from filters array
CAST(
JSON_EXTRACT_SCALAR(
JSON_EXTRACT(symbol_json, '$.filters[0]'),
'$.tickSize'
) AS DECIMAL(38,18)
) AS tick_size,
-- Extract lot_size from filters array
CAST(
JSON_EXTRACT_SCALAR(
JSON_EXTRACT(symbol_json, '$.filters[1]'),
'$.stepSize'
) AS DECIMAL(38,18)
) AS lot_size,
-- Use current timestamp as valid_from (simplification)
ingestion_timestamp AS valid_from,
NULL AS valid_to,
ingestion_timestamp AS record_created_at
FROM bronze_all,
LATERAL (
SELECT json_array_elements(
JSON_EXTRACT(api_response_raw, '$.symbols')
) AS symbol_json
)
WHERE exchange = 'binance'
),
-- Generate surrogate key (SCD Type 2 requirement)
with_surrogate_key AS (
SELECT
MD5(exchange || symbol || CAST(valid_from AS STRING)) AS instrument_sk,
*
FROM parsed_binance
)
SELECT * FROM with_surrogate_keyKey Concepts:
-
Incremental Materialization:
{% if is_incremental() %} WHERE ingestion_timestamp > (SELECT MAX(record_created_at) FROM {{ this }}) {% endif %}- On first run: Processes ALL Bronze data
- On subsequent runs: Processes ONLY new data since last run
{{ this }}refers to the current model (Silver table)
-
JSON Parsing:
JSON_EXTRACT_SCALAR(symbol_json, '$.symbol')- DuckDB function to extract values from JSON
$.symbolis JSONPath notation
-
Surrogate Key:
MD5(exchange || symbol || valid_from) AS instrument_sk
- Unique identifier for each version of an instrument
- Required for SCD Type 2 (multiple versions of same instrument)
-
Configuration:
materialized='incremental': Process only new dataunique_key='instrument_sk': Upsert on this keyon_schema_change='append_new_columns': Handle new fields gracefullypartition_by='exchange, months(valid_from)': Iceberg partitioning
Official Docs: https://docs.getdbt.com/docs/build/incremental-models
# Run Silver model
dbt run --select silver_instruments
# Check compiled SQL (what DBT actually runs)
cat target/compiled/k2_refdata/models/silver/silver_instruments.sql
# View run results
cat target/run_results.json | jq '.results[0]'Create analytics-ready tables with:
- Canonical identifiers (BTC-USD-SPOT)
- Cross-exchange mappings
- Normalized asset names (XBT → BTC)
File: dbt/models/gold/gold_symbology_master.sql
{{
config(
materialized='table',
partition_by='truncate(base_asset, 1)'
)
}}
WITH current_instruments AS (
-- Get only current (latest) version of each instrument
SELECT
exchange,
symbol,
base_asset,
quote_asset,
instrument_type
FROM {{ ref('silver_instruments') }}
WHERE valid_to IS NULL -- Current records only
),
normalized_instruments AS (
SELECT
exchange,
symbol,
{{ normalize_asset('base_asset') }} AS base_asset_norm,
{{ normalize_asset('quote_asset') }} AS quote_asset_norm,
{{ normalize_instrument_type('instrument_type') }} AS instrument_class
FROM current_instruments
),
canonical_ids AS (
-- Generate canonical IDs: BTC-USD-SPOT
SELECT DISTINCT
base_asset_norm || '-' || quote_asset_norm || '-' || instrument_class AS canonical_id,
base_asset_norm,
quote_asset_norm,
instrument_class
FROM normalized_instruments
),
-- Pivot exchange symbols
with_mappings AS (
SELECT
c.canonical_id,
c.base_asset_norm AS base_asset,
c.quote_asset_norm AS quote_asset,
c.instrument_class,
-- Binance symbol
MAX(CASE WHEN n.exchange = 'binance' THEN n.symbol END) AS binance_symbol,
-- Kraken symbol
MAX(CASE WHEN n.exchange = 'kraken' THEN n.symbol END) AS kraken_symbol,
CURRENT_TIMESTAMP AS created_at,
TRUE AS is_active
FROM canonical_ids c
LEFT JOIN normalized_instruments n
ON c.base_asset_norm = n.base_asset_norm
AND c.quote_asset_norm = n.quote_asset_norm
AND c.instrument_class = n.instrument_class
GROUP BY 1, 2, 3, 4
)
SELECT * FROM with_mappingsKey Concepts:
-
Model Reference:
FROM {{ ref('silver_instruments') }}
ref()creates dependency: Gold depends on Silver- DBT ensures Silver runs before Gold
- Enables automatic lineage tracking
-
Custom Macros:
{{ normalize_asset('base_asset') }}- Calls macro defined in
dbt/macros/normalize_asset.sql - Transforms XBT → BTC, USDT → USD
- Reusable across models
- Calls macro defined in
-
Pivoting:
MAX(CASE WHEN exchange = 'binance' THEN symbol END) AS binance_symbol
- Converts rows to columns
- One row per canonical ID, with exchange symbols as columns
Official Docs: https://docs.getdbt.com/reference/dbt-jinja-functions/ref
Macros are reusable SQL functions in Jinja templating language. Think of them as "SQL functions on steroids."
File: dbt/macros/normalize_asset.sql
{% macro normalize_asset(column_name) %}
CASE
-- Kraken uses XBT instead of BTC (ISO 4217)
WHEN {{ column_name }} IN ('XBT', 'XXBT') THEN 'BTC'
WHEN {{ column_name }} LIKE 'X%' THEN SUBSTR({{ column_name }}, 2) -- Remove X prefix
-- Kraken uses Z prefix for fiat
WHEN {{ column_name }} LIKE 'Z%' THEN SUBSTR({{ column_name }}, 2) -- Remove Z prefix
-- Stablecoin normalization
WHEN {{ column_name }} IN ('USDT', 'USDC', 'TUSD', 'BUSD') THEN 'USD'
-- Default: pass through
ELSE {{ column_name }}
END
{% endmacro %}Usage:
SELECT
{{ normalize_asset('base_asset') }} AS base_asset_normalized
FROM silver_instrumentsCompiles to:
SELECT
CASE
WHEN base_asset IN ('XBT', 'XXBT') THEN 'BTC'
WHEN base_asset LIKE 'X%' THEN SUBSTR(base_asset, 2)
WHEN base_asset LIKE 'Z%' THEN SUBSTR(base_asset, 2)
WHEN base_asset IN ('USDT', 'USDC', 'TUSD', 'BUSD') THEN 'USD'
ELSE base_asset
END AS base_asset_normalized
FROM silver_instrumentsFile: dbt/macros/bitemporal_scd2.sql
{% macro bitemporal_scd2(
source_relation,
target_relation,
unique_key,
check_cols,
valid_from_col='valid_from'
) %}
-- Step 1: Identify changed records
WITH source_data AS (
SELECT
{{ unique_key }} AS natural_key,
{{ valid_from_col }} AS valid_from,
{% for col in check_cols %}
{{ col }},
{% endfor %}
CURRENT_TIMESTAMP AS record_created_at
FROM {{ source_relation }}
),
existing_current AS (
SELECT *
FROM {{ target_relation }}
WHERE valid_to IS NULL -- Only current records
),
changed_records AS (
SELECT
s.*,
e.instrument_sk AS existing_sk
FROM source_data s
LEFT JOIN existing_current e
ON s.natural_key = e.{{ unique_key }}
WHERE
-- New record (not in target)
e.instrument_sk IS NULL
-- Or specifications changed
OR (
{% for col in check_cols %}
s.{{ col }} != e.{{ col }}
{% if not loop.last %}OR{% endif %}
{% endfor %}
)
)
-- Step 2: Close old records (set valid_to)
UPDATE {{ target_relation }}
SET valid_to = changed.valid_from - INTERVAL '1 second'
FROM changed_records changed
WHERE {{ target_relation }}.instrument_sk = changed.existing_sk
AND {{ target_relation }}.valid_to IS NULL;
-- Step 3: Insert new records
INSERT INTO {{ target_relation }}
SELECT
MD5({{ unique_key }} || {{ valid_from_col }}) AS instrument_sk,
*,
NULL AS valid_to,
'spec_update' AS change_reason,
'dbt_transformation' AS changed_by
FROM changed_records;
{% endmacro %}Usage:
{{ bitemporal_scd2(
source_relation=ref('bronze_instruments'),
target_relation=ref('silver_instruments'),
unique_key='exchange || symbol',
check_cols=['tick_size', 'lot_size', 'min_notional']
) }}What It Does:
- Compares source (new data) with target (existing Silver table)
- Identifies changed specifications (tick_size, lot_size, etc.)
- Closes old records by setting
valid_to - Inserts new records with
valid_to = NULL(current)
Official Docs: https://docs.getdbt.com/docs/build/jinja-macros
DBT provides standard tests out-of-the-box:
File: dbt/models/silver/silver_instruments.yml
version: 2
models:
- name: silver_instruments
description: Bitemporal instrument specifications (SCD Type 2)
columns:
- name: instrument_sk
description: Surrogate key (MD5 hash)
tests:
- unique
- not_null
- name: exchange
description: Exchange identifier
tests:
- not_null
- accepted_values:
values: ['binance', 'kraken', 'bybit']
- name: tick_size
description: Minimum price increment
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
inclusive: false # Must be > 0
- name: valid_from
description: Business time effective start
tests:
- not_null
- name: valid_to
description: Business time effective end
tests:
- dbt_utils.expression_is_true:
expression: "valid_to IS NULL OR valid_to > valid_from"Running Tests:
# Run all tests
dbt test
# Run tests for specific model
dbt test --select silver_instruments
# Run specific test type
dbt test --select test_type:uniqueFor complex business logic, write custom SQL tests:
File: dbt/tests/test_no_temporal_overlaps.sql
-- Test: No two records for same instrument should have overlapping validity periods
WITH overlaps AS (
SELECT
a.exchange,
a.symbol,
a.valid_from AS a_start,
a.valid_to AS a_end,
b.valid_from AS b_start,
b.valid_to AS b_end
FROM {{ ref('silver_instruments') }} a
JOIN {{ ref('silver_instruments') }} b
ON a.exchange = b.exchange
AND a.symbol = b.symbol
AND a.instrument_sk != b.instrument_sk -- Different records
WHERE
-- Check for overlapping periods
a.valid_from < COALESCE(b.valid_to, TIMESTAMP '9999-12-31')
AND COALESCE(a.valid_to, TIMESTAMP '9999-12-31') > b.valid_from
)
SELECT * FROM overlaps;
-- If this query returns ANY rows, test FAILSRunning Custom Tests:
dbt test --select test_name:test_no_temporal_overlaps$ dbt test
Running with dbt=1.7.0
Found 3 models, 12 tests, 0 snapshots, 0 analyses, 0 macros, 2 sources
Concurrency: 4 threads (target='dev')
1 of 12 START test unique_silver_instruments_instrument_sk .............. [RUN]
1 of 12 PASS unique_silver_instruments_instrument_sk .................... [PASS in 0.21s]
2 of 12 START test not_null_silver_instruments_tick_size ................ [RUN]
2 of 12 PASS not_null_silver_instruments_tick_size ...................... [PASS in 0.18s]
...
12 of 12 START test test_no_temporal_overlaps ........................... [RUN]
12 of 12 PASS test_no_temporal_overlaps ................................. [PASS in 0.45s]
Finished running 12 tests in 3.12s.
Completed successfully
Done. PASS=12 WARN=0 ERROR=0 SKIP=0 TOTAL=12Official Docs: https://docs.getdbt.com/docs/build/data-tests
# Navigate to DBT project
cd dbt/
# Run all models
dbt run
# Run specific model
dbt run --select silver_instruments
# Run model and all downstream models
dbt run --select silver_instruments+
# Run all models in Silver layer
dbt run --select silver.*
# Run all tests
dbt test
# Test specific model
dbt test --select silver_instruments
# Compile SQL without running
dbt compile
# Generate documentation
dbt docs generate
dbt docs serve # Opens browser on http://localhost:8080| Flag | Purpose | Example |
|---|---|---|
--select |
Run specific models | dbt run --select silver_instruments |
--exclude |
Exclude models | dbt run --exclude gold.* |
--full-refresh |
Force full rebuild (ignore incremental) | dbt run --full-refresh |
--threads |
Parallel execution | dbt run --threads 4 |
--profiles-dir |
Custom profiles location | dbt run --profiles-dir ./ |
--target |
Target environment | dbt run --target prod |
# Model and all downstream dependencies
dbt run --select silver_instruments+
# Model and all upstream dependencies
dbt run --select +silver_instruments
# Model and everything (up and down)
dbt run --select +silver_instruments+
# All models in a directory
dbt run --select silver.*
# Models with specific tag
dbt run --select tag:dailyOfficial Docs: https://docs.getdbt.com/reference/node-selection/syntax
We've created convenient shortcuts in the Makefile:
# From project root
make dbt-run # Run all models
make dbt-test # Run all tests
make dbt-docs # Generate and serve documentation
make dbt-clean # Clean build artifactsError:
Database Error in model silver_instruments
Table 'refdata.bronze_instruments_binance' not found
Solution:
# Initialize Iceberg catalog
python scripts/init_iceberg_catalog.py
# Verify tables exist
python -c "
from pyiceberg.catalog import load_catalog
catalog = load_catalog('refdata', uri='http://localhost:8181')
print(catalog.list_tables('refdata'))
"Error:
IO Error: Unable to connect to S3 endpoint
Solution:
# Check MinIO is running
docker ps | grep minio
# Check environment variables
echo $DBT_S3_ENDPOINT
echo $DBT_S3_ACCESS_KEY_ID
# Test S3 connection manually
aws --endpoint-url http://localhost:9000 s3 ls s3://refdata-warehouseProblem: Running dbt run but Silver table not updating with new data
Solution:
# Force full refresh (rebuilds from scratch)
dbt run --select silver_instruments --full-refresh
# Or delete the table and re-run
dbt run-operation drop_relation --args "{relation: ref('silver_instruments')}"
dbt run --select silver_instrumentsError:
FAIL 1 test_unique_silver_instruments_instrument_sk
Debug:
# View compiled test SQL
cat target/compiled/k2_refdata/models/silver/silver_instruments.yml/unique_silver_instruments_instrument_sk.sql
# Run test SQL directly in DuckDB
duckdb -c "$(cat target/compiled/.../unique_...sql)"
# Check for duplicates
duckdb -c "
SELECT instrument_sk, COUNT(*)
FROM refdata.silver_instruments
GROUP BY instrument_sk
HAVING COUNT(*) > 1
"# Run with verbose logging
dbt run --select silver_instruments --debug
# This shows:
# - SQL compilation steps
# - Connection details
# - Execution time per step
# - Row countsDBT compiles Jinja templates into plain SQL. View what actually runs:
# Compile without running
dbt compile --select silver_instruments
# View compiled SQL
cat target/compiled/k2_refdata/models/silver/silver_instruments.sql
# This is the ACTUAL SQL that DuckDB executes
# All {{ }} Jinja is replaced with real values# DBT logs
cat dbt/logs/dbt.log
# Run results (JSON)
cat dbt/target/run_results.json | jq '.'
# Manifest (full DAG)
cat dbt/target/manifest.json | jq '.nodes | keys'Official Docs: https://docs.getdbt.com/reference/commands/debug
✅ DO: Organize by layer (bronze, silver, gold)
models/
bronze/
silver/
gold/
❌ DON'T: Mix layers or create deep hierarchies
models/
all_models/
some_model.sql
✅ DO: Use descriptive, consistent names
- Sources:
bronze_* - Dimensions:
silver_* - Facts:
silver_fact_* - Marts:
gold_*
❌ DON'T: Use abbreviations or unclear names
tmp_tbl1.sqldata.sql
✅ DO: Document models and columns
models:
- name: silver_instruments
description: |
Bitemporal instrument specifications.
Tracks changes over time using SCD Type 2.
columns:
- name: tick_size
description: Minimum price increment in quote currency❌ DON'T: Leave models undocumented
✅ DO: Test critical business logic
- Unique keys
- Not null for required fields
- Valid ranges
- Referential integrity
❌ DON'T: Over-test trivial things
- Don't test every column is not null
- Don't duplicate tests
✅ DO: Use incremental for large tables
{{ config(materialized='incremental') }}
{% if is_incremental() %}
WHERE event_time > (SELECT MAX(event_time) FROM {{ this }})
{% endif %}❌ DON'T: Use incremental for small tables (<1M rows)
- Adds complexity
- Full refresh is often faster
✅ DO: Extract repeated logic into macros
{{ normalize_asset('base_asset') }}❌ DON'T: Create macros for one-time use
- Adds abstraction overhead
- Harder to debug
✅ DO: Commit all DBT files
dbt_project.ymlprofiles.yml(without secrets!)- All models, tests, macros
packages.ymlif using packages
❌ DON'T: Commit generated files
target/dbt_packages/logs/
✅ DO: Refactor incrementally
- Create new model alongside old
- Validate results match
- Switch downstream dependencies
- Deprecate old model
❌ DON'T: Refactor in-place without validation
- Risk breaking downstream models
- No rollback path
Official Best Practices: https://docs.getdbt.com/guides/best-practices
- Docs: https://docs.getdbt.com/
- Learn: https://courses.getdbt.com/ (free courses!)
- Discourse: https://discourse.getdbt.com/ (community forum)
- Blog: https://www.getdbt.com/blog/
- Slack: https://www.getdbt.com/community/join-the-community/
- ADR-003: DBT vs Spark - Why we chose DBT
- ADR-001: Bitemporal Modeling - SCD Type 2 design
- ADR-005: Schema Evolution - Handling schema changes
- SCHEMA.md - Data model details
- TESTING.md - Testing strategy including DBT tests
- "Analytics Engineering with dbt" by Claire Carroll (dbt blog)
- "The dbt Viewpoint" - https://docs.getdbt.com/docs/about/viewpoint
- "Kimball Dimensional Modeling" (for understanding SCD Type 2)
- "Data Build Tool Best Practices" - https://docs.getdbt.com/guides/best-practices
- ✅ Complete this guide
- ⏭️ Read DBT-QUICKSTART.md for common commands
- ⏭️ Try the hands-on exercises (coming soon!)
- ⏭️ Review our actual models in
dbt/models/ - ⏭️ Ask questions in #k2-refdata-platform Slack
Stuck? Check:
- Debugging and Troubleshooting
- DBT Discourse
- Team Slack: #k2-refdata-platform
- Pair with a team member!
Found a bug in this guide? Open a PR or Slack @data-engineering
Last Updated: 2026-01-23 Maintainer: K2 Data Engineering Team Version: 1.0.0