Skip to content

Latest commit

 

History

History
902 lines (733 loc) · 22 KB

File metadata and controls

902 lines (733 loc) · 22 KB

K2 Reference Data Platform - Testing Guide

Testing Philosophy

Core Principle: "Write tests that provide confidence, not just coverage."

What to Test

Test:

  • Business logic (bitemporal queries, SCD Type 2 transformations)
  • API contracts (request validation, response schemas)
  • Data quality rules (DBT tests)
  • Error handling at system boundaries
  • Integration points (Kafka, Iceberg, PostgreSQL)

Don't Test:

  • Trivial getters/setters
  • Third-party library internals
  • Generated code (DBT SQL unless custom macros)
  • Obvious Python builtins

Test Pyramid

        /\
       /E2E\         ← 5 tests (full pipeline, slow, ~5 min)
      /------\
     /  Integ \      ← 20 tests (component boundaries, ~1 min)
    /----------\
   /   Unit     \    ← 100+ tests (business logic, fast, <10 sec)
  /--------------\

Unit Tests (src/refdata/):

  • Fast: No Docker dependencies, no network I/O
  • Isolated: Mock external dependencies
  • Numerous: Cover edge cases, error paths

Integration Tests (tests/integration/):

  • Docker-based: Requires Kafka, MinIO, PostgreSQL
  • Component boundaries: Ingestion → Kafka, DBT → Iceberg
  • Fewer: Focus on happy path + critical failure scenarios

End-to-End Tests (tests/e2e/):

  • Full pipeline: Exchange API → Bronze → Silver → Gold → API
  • Slow: Run in CI only (not locally during development)
  • Minimal: Happy path + 1-2 critical scenarios

Test Organization

Directory Structure

tests/
├── unit/                           # Fast unit tests (no Docker)
│   ├── ingestion/
│   │   ├── test_binance_client.py
│   │   ├── test_kraken_client.py
│   │   └── test_producers.py
│   ├── query/
│   │   ├── test_bitemporal_queries.py
│   │   └── test_connection_pool.py
│   └── api/
│       ├── test_instruments_router.py
│       └── test_symbology_router.py
├── integration/                    # Docker-based integration tests
│   ├── test_kafka_ingestion.py
│   ├── test_dbt_transformations.py
│   └── test_api_queries.py
├── e2e/                            # End-to-end pipeline tests
│   ├── test_binance_to_api.py
│   └── test_symbology_pipeline.py
├── conftest.py                     # Pytest fixtures (shared)
└── fixtures/                       # Test data
    ├── binance_response.json
    └── kraken_response.json

Test Markers

Tests are categorized using pytest markers:

import pytest

@pytest.mark.unit
def test_fast_business_logic():
    """Fast test, no external dependencies"""
    pass

@pytest.mark.integration
def test_kafka_connection():
    """Requires Docker services"""
    pass

@pytest.mark.e2e
def test_full_pipeline():
    """End-to-end test, slow"""
    pass

@pytest.mark.bitemporal
def test_point_in_time_query():
    """Tests bitemporal query logic"""
    pass

@pytest.mark.scd2
def test_scd_type_2_transformation():
    """Tests SCD Type 2 logic"""
    pass

@pytest.mark.slow
def test_expensive_operation():
    """Slow test (>1 second)"""
    pass

Running Tests

Basic Commands

# All unit tests (fast, default)
make test-unit

# All integration tests (requires Docker)
make test-integration

# All tests (unit + integration + e2e)
make test-all

# With coverage report
make coverage

Pytest Options

# Run specific test file
pytest tests/unit/ingestion/test_binance_client.py -v

# Run specific test function
pytest tests/unit/query/test_bitemporal_queries.py::test_query_before_change -v

# Run tests matching pattern
pytest -k "binance" -v

# Run tests with specific marker
pytest -m bitemporal -v

# Skip slow tests
pytest -m "not slow" -v

# Verbose output
pytest -vv

# Stop on first failure
pytest -x

# Show print statements
pytest -s

# Parallel execution (requires pytest-xdist)
pytest -n auto

Unit Test Examples

Testing Exchange Clients

# tests/unit/ingestion/test_binance_client.py
import pytest
from unittest.mock import patch, Mock
from refdata.ingestion.sources.binance import BinanceClient

@pytest.mark.unit
def test_fetch_instruments_success():
    """
    Given: Binance API returns valid response
    When: fetch_instruments() is called
    Then: Returns parsed instruments dictionary
    """
    # Arrange
    client = BinanceClient()
    mock_response = {
        "symbols": [
            {
                "symbol": "BTCUSDT",
                "baseAsset": "BTC",
                "quoteAsset": "USDT",
                "status": "TRADING"
            }
        ]
    }

    # Act
    with patch('requests.get') as mock_get:
        mock_get.return_value.json.return_value = mock_response
        mock_get.return_value.status_code = 200
        result = client.fetch_instruments()

    # Assert
    assert result == mock_response
    assert len(result["symbols"]) == 1
    assert result["symbols"][0]["symbol"] == "BTCUSDT"


@pytest.mark.unit
def test_fetch_instruments_retry_on_429():
    """
    Given: Binance API returns 429 (rate limit)
    When: fetch_instruments() is called
    Then: Retries with exponential backoff
    """
    client = BinanceClient()

    with patch('requests.get') as mock_get:
        # First call: 429, second call: 200
        mock_get.side_effect = [
            Mock(status_code=429, text="Rate limit exceeded"),
            Mock(status_code=200, json=lambda: {"symbols": []})
        ]

        result = client.fetch_instruments()

        # Should have retried
        assert mock_get.call_count == 2
        assert result == {"symbols": []}


@pytest.mark.unit
def test_fetch_instruments_invalid_response():
    """
    Given: Binance API returns malformed JSON
    When: fetch_instruments() is called
    Then: Raises ValueError with descriptive message
    """
    client = BinanceClient()

    with patch('requests.get') as mock_get:
        mock_get.return_value.json.side_effect = ValueError("Invalid JSON")

        with pytest.raises(ValueError, match="Invalid JSON"):
            client.fetch_instruments()

Testing Bitemporal Queries

# tests/unit/query/test_bitemporal_queries.py
import pytest
from datetime import datetime
from refdata.query.bitemporal import query_instruments_as_of

@pytest.mark.unit
@pytest.mark.bitemporal
def test_query_before_change_returns_old_spec(duckdb_conn_with_data):
    """
    Given: Instrument with tick_size change on 2024-01-15
    When: Query as_of 2024-01-14
    Then: Returns old tick_size (0.01)
    """
    # Arrange
    conn = duckdb_conn_with_data([
        {
            'exchange': 'binance',
            'symbol': 'BTCUSDT',
            'tick_size': '0.01',
            'valid_from': datetime(2024, 1, 10),
            'valid_to': datetime(2024, 1, 15),
            'record_created_at': datetime(2024, 1, 11, 15, 0)
        },
        {
            'exchange': 'binance',
            'symbol': 'BTCUSDT',
            'tick_size': '0.05',
            'valid_from': datetime(2024, 1, 15),
            'valid_to': None,
            'record_created_at': datetime(2024, 1, 14, 10, 0)
        }
    ])

    # Act
    result = query_instruments_as_of(
        conn=conn,
        exchange='binance',
        symbol='BTCUSDT',
        as_of=datetime(2024, 1, 14, 12, 0)
    )

    # Assert
    assert result is not None
    assert result['tick_size'] == '0.01'
    assert result['valid_from'] == datetime(2024, 1, 10)


@pytest.mark.unit
@pytest.mark.bitemporal
def test_late_correction_preserves_history(duckdb_conn_with_data):
    """
    Given: Initial change record + late correction
    When: Query as_of original timestamp
    Then: Returns corrected state (latest record_created_at wins)
    """
    # Arrange
    conn = duckdb_conn_with_data([
        # Original record: announced effective Jan 15
        {
            'exchange': 'binance',
            'symbol': 'BTCUSDT',
            'tick_size': '0.05',
            'valid_from': datetime(2024, 1, 15, 0, 0),
            'valid_to': None,
            'record_created_at': datetime(2024, 1, 14, 10, 0)
        },
        # Correction: actually effective Jan 14 23:00
        {
            'exchange': 'binance',
            'symbol': 'BTCUSDT',
            'tick_size': '0.05',
            'valid_from': datetime(2024, 1, 14, 23, 0),
            'valid_to': None,
            'record_created_at': datetime(2024, 1, 16, 8, 0)
        }
    ])

    # Act: Query between corrected and original valid_from
    result = query_instruments_as_of(
        conn=conn,
        exchange='binance',
        symbol='BTCUSDT',
        as_of=datetime(2024, 1, 14, 23, 30)
    )

    # Assert: Should return corrected record
    assert result is not None
    assert result['tick_size'] == '0.05'
    assert result['record_created_at'] == datetime(2024, 1, 16, 8, 0)


@pytest.mark.unit
@pytest.mark.bitemporal
def test_query_nonexistent_instrument_returns_none(duckdb_conn_with_data):
    """
    Given: Empty database
    When: Query for nonexistent instrument
    Then: Returns None (not error)
    """
    conn = duckdb_conn_with_data([])

    result = query_instruments_as_of(
        conn=conn,
        exchange='binance',
        symbol='NONEXISTENT',
        as_of=datetime(2024, 1, 15)
    )

    assert result is None

Testing API Endpoints

# tests/unit/api/test_instruments_router.py
import pytest
from fastapi.testclient import TestClient
from datetime import datetime

@pytest.mark.unit
def test_get_instruments_success(client, mock_query_fn):
    """
    Given: Valid query parameters
    When: GET /v1/instruments
    Then: Returns 200 with instrument data
    """
    # Arrange
    mock_query_fn.return_value = {
        'exchange': 'binance',
        'symbol': 'BTCUSDT',
        'tick_size': '0.01',
        'valid_from': datetime(2024, 1, 10)
    }

    # Act
    response = client.get(
        "/v1/instruments",
        params={
            "exchange": "binance",
            "symbol": "BTCUSDT",
            "as_of": "2024-01-15T10:00:00Z"
        }
    )

    # Assert
    assert response.status_code == 200
    data = response.json()
    assert data['data']['symbol'] == 'BTCUSDT'
    assert data['data']['tick_size'] == '0.01'


@pytest.mark.unit
def test_get_instruments_missing_required_param(client):
    """
    Given: Missing required parameter (exchange)
    When: GET /v1/instruments
    Then: Returns 422 validation error
    """
    response = client.get(
        "/v1/instruments",
        params={"symbol": "BTCUSDT"}
    )

    assert response.status_code == 422
    error = response.json()
    assert "exchange" in str(error)


@pytest.mark.unit
def test_get_instruments_not_found(client, mock_query_fn):
    """
    Given: Instrument does not exist
    When: GET /v1/instruments
    Then: Returns 404 with error message
    """
    mock_query_fn.return_value = None

    response = client.get(
        "/v1/instruments",
        params={
            "exchange": "binance",
            "symbol": "NONEXISTENT",
            "as_of": "2024-01-15T10:00:00Z"
        }
    )

    assert response.status_code == 404
    error = response.json()
    assert error['error']['code'] == 'INSTRUMENT_NOT_FOUND'

Integration Test Examples

Testing Kafka Ingestion

# tests/integration/test_kafka_ingestion.py
import pytest
import json
from confluent_kafka import Consumer

@pytest.mark.integration
def test_binance_ingestion_publishes_to_kafka(
    kafka_cluster,
    binance_client
):
    """
    Given: Binance client with mocked API response
    When: Ingestion job runs
    Then: Message published to Kafka topic
    """
    # Arrange
    topic = 'refdata.instruments.binance.raw'
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'test-consumer',
        'auto.offset.reset': 'earliest'
    })
    consumer.subscribe([topic])

    # Act
    binance_client.ingest()

    # Assert: Consume message from Kafka
    msg = consumer.poll(timeout=10.0)
    assert msg is not None
    assert msg.error() is None

    value = json.loads(msg.value().decode('utf-8'))
    assert 'symbols' in value
    assert len(value['symbols']) > 0

    consumer.close()


@pytest.mark.integration
def test_idempotent_ingestion_no_duplicates(
    kafka_cluster,
    binance_client
):
    """
    Given: Same API response ingested twice
    When: Ingestion runs twice with same data
    Then: Only one message in Kafka (idempotency)
    """
    # Arrange
    topic = 'refdata.instruments.binance.raw'

    # Act
    ingestion_id_1 = binance_client.ingest()
    ingestion_id_2 = binance_client.ingest()

    # Assert: Same ingestion_id = idempotent
    assert ingestion_id_1 == ingestion_id_2

    # Verify Kafka has only one message
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'test-idempotency',
        'auto.offset.reset': 'earliest'
    })
    consumer.subscribe([topic])

    messages = []
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            break
        messages.append(msg)

    # Should have exactly 1 message (deduplicated)
    assert len(messages) == 1

    consumer.close()

Testing DBT Transformations

# tests/integration/test_dbt_transformations.py
import pytest
import duckdb

@pytest.mark.integration
@pytest.mark.scd2
def test_dbt_scd2_closes_old_record_on_change(
    dbt_project,
    iceberg_catalog
):
    """
    Given: Existing Silver record for BTCUSDT
    When: DBT processes new Bronze record with changed tick_size
    Then: Old record gets valid_to set, new record inserted
    """
    # Arrange: Insert Bronze record
    insert_bronze_record(iceberg_catalog, {
        'exchange': 'binance',
        'symbol': 'BTCUSDT',
        'tick_size': '0.01',
        'valid_from': '2024-01-10 00:00:00'
    })

    # Act: Run DBT transformation (initial load)
    dbt_project.run('run --select silver_instruments')

    silver_before = query_silver(iceberg_catalog, 'binance', 'BTCUSDT')
    assert len(silver_before) == 1
    assert silver_before[0]['valid_to'] is None  # Current record

    # Arrange: Insert new Bronze record with change
    insert_bronze_record(iceberg_catalog, {
        'exchange': 'binance',
        'symbol': 'BTCUSDT',
        'tick_size': '0.05',  # Changed!
        'valid_from': '2024-01-15 00:00:00'
    })

    # Act: Run DBT transformation (incremental)
    dbt_project.run('run --select silver_instruments')

    # Assert
    silver_after = query_silver(iceberg_catalog, 'binance', 'BTCUSDT')
    assert len(silver_after) == 2  # Old + New

    old_record = [r for r in silver_after if r['tick_size'] == '0.01'][0]
    new_record = [r for r in silver_after if r['tick_size'] == '0.05'][0]

    # Old record closed
    assert old_record['valid_to'] is not None
    assert old_record['valid_to'] < new_record['valid_from']

    # New record is current
    assert new_record['valid_to'] is None

End-to-End Test Examples

Full Pipeline Test

# tests/e2e/test_binance_to_api.py
import pytest
import requests
from unittest.mock import patch

@pytest.mark.e2e
def test_full_pipeline_binance_to_api(
    kafka_cluster,
    iceberg_catalog,
    dbt_runner,
    fastapi_app
):
    """
    End-to-end: Binance API → Bronze → DBT → Silver → API query
    """
    # Step 1: Mock Binance API response
    mock_binance_response = {
        'symbols': [{
            'symbol': 'BTCUSDT',
            'baseAsset': 'BTC',
            'quoteAsset': 'USDT',
            'tickSize': '0.01',
            'status': 'TRADING'
        }]
    }

    with patch('requests.get') as mock_get:
        mock_get.return_value.json.return_value = mock_binance_response
        mock_get.return_value.status_code = 200

        # Step 2: Run ingestion
        from refdata.cli.ingest import ingest_binance
        ingest_binance()

    # Step 3: Verify Bronze record created
    bronze_records = query_kafka_topic('refdata.instruments.binance.raw')
    assert len(bronze_records) == 1
    assert bronze_records[0]['symbols'][0]['symbol'] == 'BTCUSDT'

    # Step 4: Run DBT transformation
    dbt_runner.run('--select silver_instruments')

    # Step 5: Verify Silver record created
    silver_records = query_iceberg_table('refdata.silver_instruments')
    assert len(silver_records) == 1
    assert silver_records[0]['tick_size'] == '0.01'

    # Step 6: Query API
    response = requests.get(
        'http://localhost:8001/v1/instruments',
        params={
            'exchange': 'binance',
            'symbol': 'BTCUSDT',
            'as_of': '2024-01-11T00:00:00Z'
        }
    )

    # Step 7: Assert API response
    assert response.status_code == 200
    data = response.json()['data']
    assert data['tick_size'] == '0.01'
    assert data['exchange'] == 'binance'

Fixtures

Pytest Fixtures (conftest.py)

# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from refdata.api.main import app

@pytest.fixture
def client():
    """FastAPI test client"""
    return TestClient(app)


@pytest.fixture
def duckdb_conn():
    """In-memory DuckDB connection"""
    import duckdb
    conn = duckdb.connect(':memory:')
    yield conn
    conn.close()


@pytest.fixture
def duckdb_conn_with_data():
    """DuckDB connection with test data loader"""
    import duckdb

    def _load_data(records: list[dict]):
        conn = duckdb.connect(':memory:')
        # Create table and insert records
        conn.execute("""
            CREATE TABLE silver_instruments (
                exchange STRING,
                symbol STRING,
                tick_size STRING,
                valid_from TIMESTAMP,
                valid_to TIMESTAMP,
                record_created_at TIMESTAMP
            )
        """)
        for record in records:
            conn.execute("""
                INSERT INTO silver_instruments VALUES (?, ?, ?, ?, ?, ?)
            """, [
                record['exchange'],
                record['symbol'],
                record['tick_size'],
                record['valid_from'],
                record['valid_to'],
                record['record_created_at']
            ])
        return conn

    return _load_data


@pytest.fixture(scope="session")
def kafka_cluster():
    """Start Kafka cluster for integration tests"""
    # Use testcontainers or assume docker-compose running
    yield "localhost:9092"


@pytest.fixture(scope="session")
def iceberg_catalog():
    """Iceberg REST catalog for integration tests"""
    from pyiceberg.catalog import load_catalog
    catalog = load_catalog("test", uri="http://localhost:8181")
    yield catalog

DBT Tests

Built-in Tests (YAML)

# dbt/models/silver/silver_instruments.yml
version: 2

models:
  - name: silver_instruments
    description: Bitemporal instrument specifications

    columns:
      - name: instrument_sk
        tests:
          - unique
          - not_null

      - name: exchange
        tests:
          - not_null
          - accepted_values:
              values: ['binance', 'kraken', 'bybit']

      - name: tick_size
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              inclusive: false

      - name: valid_from
        tests:
          - not_null

      - name: valid_to
        tests:
          - dbt_utils.expression_is_true:
              expression: "valid_to IS NULL OR valid_to > valid_from"

Custom Tests (SQL)

-- dbt/tests/test_no_temporal_overlaps.sql
WITH overlaps AS (
    SELECT
        a.exchange,
        a.symbol,
        a.valid_from AS a_start,
        a.valid_to AS a_end,
        b.valid_from AS b_start,
        b.valid_to AS b_end
    FROM {{ ref('silver_instruments') }} a
    JOIN {{ ref('silver_instruments') }} b
        ON a.exchange = b.exchange
        AND a.symbol = b.symbol
        AND a.instrument_sk != b.instrument_sk
    WHERE
        a.valid_from < COALESCE(b.valid_to, '9999-12-31')
        AND COALESCE(a.valid_to, '9999-12-31') > b.valid_from
)
SELECT * FROM overlaps;
-- Test passes if this returns 0 rows

Continuous Integration

GitHub Actions Workflow

# .github/workflows/test.yml
name: Tests

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_PASSWORD: postgres
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install uv
          uv pip install -e ".[dev]"

      - name: Run unit tests
        run: make test-unit

      - name: Run code quality checks
        run: make quality

      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          file: ./coverage.xml

Best Practices

Test Naming

# Good: Describes what is being tested
def test_query_before_change_returns_old_spec():
    pass

# Bad: Vague or unhelpful
def test_query():
    pass

Arrange-Act-Assert Pattern

def test_example():
    # Arrange: Set up test data and mocks
    client = BinanceClient()
    mock_response = {"symbols": []}

    # Act: Execute the code under test
    result = client.fetch_instruments()

    # Assert: Verify expected behavior
    assert result == mock_response

Meaningful Assertions

# Good: Specific assertions with context
assert result['tick_size'] == '0.01', f"Expected 0.01 but got {result['tick_size']}"

# Bad: Generic assertion without context
assert result

Use Fixtures for Common Setup

@pytest.fixture
def sample_instrument():
    return {
        'exchange': 'binance',
        'symbol': 'BTCUSDT',
        'tick_size': '0.01'
    }

def test_with_fixture(sample_instrument):
    assert sample_instrument['symbol'] == 'BTCUSDT'

Related Documentation