Core Principle: "Write tests that provide confidence, not just coverage."
✅ Test:
- Business logic (bitemporal queries, SCD Type 2 transformations)
- API contracts (request validation, response schemas)
- Data quality rules (DBT tests)
- Error handling at system boundaries
- Integration points (Kafka, Iceberg, PostgreSQL)
❌ Don't Test:
- Trivial getters/setters
- Third-party library internals
- Generated code (DBT SQL unless custom macros)
- Obvious Python builtins
/\
/E2E\ ← 5 tests (full pipeline, slow, ~5 min)
/------\
/ Integ \ ← 20 tests (component boundaries, ~1 min)
/----------\
/ Unit \ ← 100+ tests (business logic, fast, <10 sec)
/--------------\
Unit Tests (src/refdata/):
- Fast: No Docker dependencies, no network I/O
- Isolated: Mock external dependencies
- Numerous: Cover edge cases, error paths
Integration Tests (tests/integration/):
- Docker-based: Requires Kafka, MinIO, PostgreSQL
- Component boundaries: Ingestion → Kafka, DBT → Iceberg
- Fewer: Focus on happy path + critical failure scenarios
End-to-End Tests (tests/e2e/):
- Full pipeline: Exchange API → Bronze → Silver → Gold → API
- Slow: Run in CI only (not locally during development)
- Minimal: Happy path + 1-2 critical scenarios
tests/
├── unit/ # Fast unit tests (no Docker)
│ ├── ingestion/
│ │ ├── test_binance_client.py
│ │ ├── test_kraken_client.py
│ │ └── test_producers.py
│ ├── query/
│ │ ├── test_bitemporal_queries.py
│ │ └── test_connection_pool.py
│ └── api/
│ ├── test_instruments_router.py
│ └── test_symbology_router.py
├── integration/ # Docker-based integration tests
│ ├── test_kafka_ingestion.py
│ ├── test_dbt_transformations.py
│ └── test_api_queries.py
├── e2e/ # End-to-end pipeline tests
│ ├── test_binance_to_api.py
│ └── test_symbology_pipeline.py
├── conftest.py # Pytest fixtures (shared)
└── fixtures/ # Test data
├── binance_response.json
└── kraken_response.json
Tests are categorized using pytest markers:
import pytest
@pytest.mark.unit
def test_fast_business_logic():
"""Fast test, no external dependencies"""
pass
@pytest.mark.integration
def test_kafka_connection():
"""Requires Docker services"""
pass
@pytest.mark.e2e
def test_full_pipeline():
"""End-to-end test, slow"""
pass
@pytest.mark.bitemporal
def test_point_in_time_query():
"""Tests bitemporal query logic"""
pass
@pytest.mark.scd2
def test_scd_type_2_transformation():
"""Tests SCD Type 2 logic"""
pass
@pytest.mark.slow
def test_expensive_operation():
"""Slow test (>1 second)"""
pass# All unit tests (fast, default)
make test-unit
# All integration tests (requires Docker)
make test-integration
# All tests (unit + integration + e2e)
make test-all
# With coverage report
make coverage# Run specific test file
pytest tests/unit/ingestion/test_binance_client.py -v
# Run specific test function
pytest tests/unit/query/test_bitemporal_queries.py::test_query_before_change -v
# Run tests matching pattern
pytest -k "binance" -v
# Run tests with specific marker
pytest -m bitemporal -v
# Skip slow tests
pytest -m "not slow" -v
# Verbose output
pytest -vv
# Stop on first failure
pytest -x
# Show print statements
pytest -s
# Parallel execution (requires pytest-xdist)
pytest -n auto# tests/unit/ingestion/test_binance_client.py
import pytest
from unittest.mock import patch, Mock
from refdata.ingestion.sources.binance import BinanceClient
@pytest.mark.unit
def test_fetch_instruments_success():
"""
Given: Binance API returns valid response
When: fetch_instruments() is called
Then: Returns parsed instruments dictionary
"""
# Arrange
client = BinanceClient()
mock_response = {
"symbols": [
{
"symbol": "BTCUSDT",
"baseAsset": "BTC",
"quoteAsset": "USDT",
"status": "TRADING"
}
]
}
# Act
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = mock_response
mock_get.return_value.status_code = 200
result = client.fetch_instruments()
# Assert
assert result == mock_response
assert len(result["symbols"]) == 1
assert result["symbols"][0]["symbol"] == "BTCUSDT"
@pytest.mark.unit
def test_fetch_instruments_retry_on_429():
"""
Given: Binance API returns 429 (rate limit)
When: fetch_instruments() is called
Then: Retries with exponential backoff
"""
client = BinanceClient()
with patch('requests.get') as mock_get:
# First call: 429, second call: 200
mock_get.side_effect = [
Mock(status_code=429, text="Rate limit exceeded"),
Mock(status_code=200, json=lambda: {"symbols": []})
]
result = client.fetch_instruments()
# Should have retried
assert mock_get.call_count == 2
assert result == {"symbols": []}
@pytest.mark.unit
def test_fetch_instruments_invalid_response():
"""
Given: Binance API returns malformed JSON
When: fetch_instruments() is called
Then: Raises ValueError with descriptive message
"""
client = BinanceClient()
with patch('requests.get') as mock_get:
mock_get.return_value.json.side_effect = ValueError("Invalid JSON")
with pytest.raises(ValueError, match="Invalid JSON"):
client.fetch_instruments()# tests/unit/query/test_bitemporal_queries.py
import pytest
from datetime import datetime
from refdata.query.bitemporal import query_instruments_as_of
@pytest.mark.unit
@pytest.mark.bitemporal
def test_query_before_change_returns_old_spec(duckdb_conn_with_data):
"""
Given: Instrument with tick_size change on 2024-01-15
When: Query as_of 2024-01-14
Then: Returns old tick_size (0.01)
"""
# Arrange
conn = duckdb_conn_with_data([
{
'exchange': 'binance',
'symbol': 'BTCUSDT',
'tick_size': '0.01',
'valid_from': datetime(2024, 1, 10),
'valid_to': datetime(2024, 1, 15),
'record_created_at': datetime(2024, 1, 11, 15, 0)
},
{
'exchange': 'binance',
'symbol': 'BTCUSDT',
'tick_size': '0.05',
'valid_from': datetime(2024, 1, 15),
'valid_to': None,
'record_created_at': datetime(2024, 1, 14, 10, 0)
}
])
# Act
result = query_instruments_as_of(
conn=conn,
exchange='binance',
symbol='BTCUSDT',
as_of=datetime(2024, 1, 14, 12, 0)
)
# Assert
assert result is not None
assert result['tick_size'] == '0.01'
assert result['valid_from'] == datetime(2024, 1, 10)
@pytest.mark.unit
@pytest.mark.bitemporal
def test_late_correction_preserves_history(duckdb_conn_with_data):
"""
Given: Initial change record + late correction
When: Query as_of original timestamp
Then: Returns corrected state (latest record_created_at wins)
"""
# Arrange
conn = duckdb_conn_with_data([
# Original record: announced effective Jan 15
{
'exchange': 'binance',
'symbol': 'BTCUSDT',
'tick_size': '0.05',
'valid_from': datetime(2024, 1, 15, 0, 0),
'valid_to': None,
'record_created_at': datetime(2024, 1, 14, 10, 0)
},
# Correction: actually effective Jan 14 23:00
{
'exchange': 'binance',
'symbol': 'BTCUSDT',
'tick_size': '0.05',
'valid_from': datetime(2024, 1, 14, 23, 0),
'valid_to': None,
'record_created_at': datetime(2024, 1, 16, 8, 0)
}
])
# Act: Query between corrected and original valid_from
result = query_instruments_as_of(
conn=conn,
exchange='binance',
symbol='BTCUSDT',
as_of=datetime(2024, 1, 14, 23, 30)
)
# Assert: Should return corrected record
assert result is not None
assert result['tick_size'] == '0.05'
assert result['record_created_at'] == datetime(2024, 1, 16, 8, 0)
@pytest.mark.unit
@pytest.mark.bitemporal
def test_query_nonexistent_instrument_returns_none(duckdb_conn_with_data):
"""
Given: Empty database
When: Query for nonexistent instrument
Then: Returns None (not error)
"""
conn = duckdb_conn_with_data([])
result = query_instruments_as_of(
conn=conn,
exchange='binance',
symbol='NONEXISTENT',
as_of=datetime(2024, 1, 15)
)
assert result is None# tests/unit/api/test_instruments_router.py
import pytest
from fastapi.testclient import TestClient
from datetime import datetime
@pytest.mark.unit
def test_get_instruments_success(client, mock_query_fn):
"""
Given: Valid query parameters
When: GET /v1/instruments
Then: Returns 200 with instrument data
"""
# Arrange
mock_query_fn.return_value = {
'exchange': 'binance',
'symbol': 'BTCUSDT',
'tick_size': '0.01',
'valid_from': datetime(2024, 1, 10)
}
# Act
response = client.get(
"/v1/instruments",
params={
"exchange": "binance",
"symbol": "BTCUSDT",
"as_of": "2024-01-15T10:00:00Z"
}
)
# Assert
assert response.status_code == 200
data = response.json()
assert data['data']['symbol'] == 'BTCUSDT'
assert data['data']['tick_size'] == '0.01'
@pytest.mark.unit
def test_get_instruments_missing_required_param(client):
"""
Given: Missing required parameter (exchange)
When: GET /v1/instruments
Then: Returns 422 validation error
"""
response = client.get(
"/v1/instruments",
params={"symbol": "BTCUSDT"}
)
assert response.status_code == 422
error = response.json()
assert "exchange" in str(error)
@pytest.mark.unit
def test_get_instruments_not_found(client, mock_query_fn):
"""
Given: Instrument does not exist
When: GET /v1/instruments
Then: Returns 404 with error message
"""
mock_query_fn.return_value = None
response = client.get(
"/v1/instruments",
params={
"exchange": "binance",
"symbol": "NONEXISTENT",
"as_of": "2024-01-15T10:00:00Z"
}
)
assert response.status_code == 404
error = response.json()
assert error['error']['code'] == 'INSTRUMENT_NOT_FOUND'# tests/integration/test_kafka_ingestion.py
import pytest
import json
from confluent_kafka import Consumer
@pytest.mark.integration
def test_binance_ingestion_publishes_to_kafka(
kafka_cluster,
binance_client
):
"""
Given: Binance client with mocked API response
When: Ingestion job runs
Then: Message published to Kafka topic
"""
# Arrange
topic = 'refdata.instruments.binance.raw'
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'test-consumer',
'auto.offset.reset': 'earliest'
})
consumer.subscribe([topic])
# Act
binance_client.ingest()
# Assert: Consume message from Kafka
msg = consumer.poll(timeout=10.0)
assert msg is not None
assert msg.error() is None
value = json.loads(msg.value().decode('utf-8'))
assert 'symbols' in value
assert len(value['symbols']) > 0
consumer.close()
@pytest.mark.integration
def test_idempotent_ingestion_no_duplicates(
kafka_cluster,
binance_client
):
"""
Given: Same API response ingested twice
When: Ingestion runs twice with same data
Then: Only one message in Kafka (idempotency)
"""
# Arrange
topic = 'refdata.instruments.binance.raw'
# Act
ingestion_id_1 = binance_client.ingest()
ingestion_id_2 = binance_client.ingest()
# Assert: Same ingestion_id = idempotent
assert ingestion_id_1 == ingestion_id_2
# Verify Kafka has only one message
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'test-idempotency',
'auto.offset.reset': 'earliest'
})
consumer.subscribe([topic])
messages = []
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
break
messages.append(msg)
# Should have exactly 1 message (deduplicated)
assert len(messages) == 1
consumer.close()# tests/integration/test_dbt_transformations.py
import pytest
import duckdb
@pytest.mark.integration
@pytest.mark.scd2
def test_dbt_scd2_closes_old_record_on_change(
dbt_project,
iceberg_catalog
):
"""
Given: Existing Silver record for BTCUSDT
When: DBT processes new Bronze record with changed tick_size
Then: Old record gets valid_to set, new record inserted
"""
# Arrange: Insert Bronze record
insert_bronze_record(iceberg_catalog, {
'exchange': 'binance',
'symbol': 'BTCUSDT',
'tick_size': '0.01',
'valid_from': '2024-01-10 00:00:00'
})
# Act: Run DBT transformation (initial load)
dbt_project.run('run --select silver_instruments')
silver_before = query_silver(iceberg_catalog, 'binance', 'BTCUSDT')
assert len(silver_before) == 1
assert silver_before[0]['valid_to'] is None # Current record
# Arrange: Insert new Bronze record with change
insert_bronze_record(iceberg_catalog, {
'exchange': 'binance',
'symbol': 'BTCUSDT',
'tick_size': '0.05', # Changed!
'valid_from': '2024-01-15 00:00:00'
})
# Act: Run DBT transformation (incremental)
dbt_project.run('run --select silver_instruments')
# Assert
silver_after = query_silver(iceberg_catalog, 'binance', 'BTCUSDT')
assert len(silver_after) == 2 # Old + New
old_record = [r for r in silver_after if r['tick_size'] == '0.01'][0]
new_record = [r for r in silver_after if r['tick_size'] == '0.05'][0]
# Old record closed
assert old_record['valid_to'] is not None
assert old_record['valid_to'] < new_record['valid_from']
# New record is current
assert new_record['valid_to'] is None# tests/e2e/test_binance_to_api.py
import pytest
import requests
from unittest.mock import patch
@pytest.mark.e2e
def test_full_pipeline_binance_to_api(
kafka_cluster,
iceberg_catalog,
dbt_runner,
fastapi_app
):
"""
End-to-end: Binance API → Bronze → DBT → Silver → API query
"""
# Step 1: Mock Binance API response
mock_binance_response = {
'symbols': [{
'symbol': 'BTCUSDT',
'baseAsset': 'BTC',
'quoteAsset': 'USDT',
'tickSize': '0.01',
'status': 'TRADING'
}]
}
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = mock_binance_response
mock_get.return_value.status_code = 200
# Step 2: Run ingestion
from refdata.cli.ingest import ingest_binance
ingest_binance()
# Step 3: Verify Bronze record created
bronze_records = query_kafka_topic('refdata.instruments.binance.raw')
assert len(bronze_records) == 1
assert bronze_records[0]['symbols'][0]['symbol'] == 'BTCUSDT'
# Step 4: Run DBT transformation
dbt_runner.run('--select silver_instruments')
# Step 5: Verify Silver record created
silver_records = query_iceberg_table('refdata.silver_instruments')
assert len(silver_records) == 1
assert silver_records[0]['tick_size'] == '0.01'
# Step 6: Query API
response = requests.get(
'http://localhost:8001/v1/instruments',
params={
'exchange': 'binance',
'symbol': 'BTCUSDT',
'as_of': '2024-01-11T00:00:00Z'
}
)
# Step 7: Assert API response
assert response.status_code == 200
data = response.json()['data']
assert data['tick_size'] == '0.01'
assert data['exchange'] == 'binance'# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from refdata.api.main import app
@pytest.fixture
def client():
"""FastAPI test client"""
return TestClient(app)
@pytest.fixture
def duckdb_conn():
"""In-memory DuckDB connection"""
import duckdb
conn = duckdb.connect(':memory:')
yield conn
conn.close()
@pytest.fixture
def duckdb_conn_with_data():
"""DuckDB connection with test data loader"""
import duckdb
def _load_data(records: list[dict]):
conn = duckdb.connect(':memory:')
# Create table and insert records
conn.execute("""
CREATE TABLE silver_instruments (
exchange STRING,
symbol STRING,
tick_size STRING,
valid_from TIMESTAMP,
valid_to TIMESTAMP,
record_created_at TIMESTAMP
)
""")
for record in records:
conn.execute("""
INSERT INTO silver_instruments VALUES (?, ?, ?, ?, ?, ?)
""", [
record['exchange'],
record['symbol'],
record['tick_size'],
record['valid_from'],
record['valid_to'],
record['record_created_at']
])
return conn
return _load_data
@pytest.fixture(scope="session")
def kafka_cluster():
"""Start Kafka cluster for integration tests"""
# Use testcontainers or assume docker-compose running
yield "localhost:9092"
@pytest.fixture(scope="session")
def iceberg_catalog():
"""Iceberg REST catalog for integration tests"""
from pyiceberg.catalog import load_catalog
catalog = load_catalog("test", uri="http://localhost:8181")
yield catalog# dbt/models/silver/silver_instruments.yml
version: 2
models:
- name: silver_instruments
description: Bitemporal instrument specifications
columns:
- name: instrument_sk
tests:
- unique
- not_null
- name: exchange
tests:
- not_null
- accepted_values:
values: ['binance', 'kraken', 'bybit']
- name: tick_size
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
inclusive: false
- name: valid_from
tests:
- not_null
- name: valid_to
tests:
- dbt_utils.expression_is_true:
expression: "valid_to IS NULL OR valid_to > valid_from"-- dbt/tests/test_no_temporal_overlaps.sql
WITH overlaps AS (
SELECT
a.exchange,
a.symbol,
a.valid_from AS a_start,
a.valid_to AS a_end,
b.valid_from AS b_start,
b.valid_to AS b_end
FROM {{ ref('silver_instruments') }} a
JOIN {{ ref('silver_instruments') }} b
ON a.exchange = b.exchange
AND a.symbol = b.symbol
AND a.instrument_sk != b.instrument_sk
WHERE
a.valid_from < COALESCE(b.valid_to, '9999-12-31')
AND COALESCE(a.valid_to, '9999-12-31') > b.valid_from
)
SELECT * FROM overlaps;
-- Test passes if this returns 0 rows# .github/workflows/test.yml
name: Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install uv
uv pip install -e ".[dev]"
- name: Run unit tests
run: make test-unit
- name: Run code quality checks
run: make quality
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml# Good: Describes what is being tested
def test_query_before_change_returns_old_spec():
pass
# Bad: Vague or unhelpful
def test_query():
passdef test_example():
# Arrange: Set up test data and mocks
client = BinanceClient()
mock_response = {"symbols": []}
# Act: Execute the code under test
result = client.fetch_instruments()
# Assert: Verify expected behavior
assert result == mock_response# Good: Specific assertions with context
assert result['tick_size'] == '0.01', f"Expected 0.01 but got {result['tick_size']}"
# Bad: Generic assertion without context
assert result@pytest.fixture
def sample_instrument():
return {
'exchange': 'binance',
'symbol': 'BTCUSDT',
'tick_size': '0.01'
}
def test_with_fixture(sample_instrument):
assert sample_instrument['symbol'] == 'BTCUSDT'- SETUP.md - Development environment setup
- ARCHITECTURE.md - System architecture
- CLAUDE.md - Development standards