Skip to content

Latest commit

 

History

History
485 lines (406 loc) · 20.6 KB

File metadata and controls

485 lines (406 loc) · 20.6 KB

K2 Reference Data Platform - System Architecture

Overview

The K2 Reference Data Platform is a production-grade system for managing cryptocurrency instrument reference data with full temporal fidelity. It ingests specifications from multiple exchanges, transforms them through a medallion architecture, and exposes them via a RESTful API.

High-Level Architecture

┌──────────────────────────────────────────────────────────────────────┐
│                        INGESTION LAYER                               │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐                    │
│  │  Binance   │  │   Kraken   │  │   Bybit    │  REST Clients      │
│  │  Client    │  │   Client   │  │   Client   │  (Python/httpx)    │
│  └──────┬─────┘  └──────┬─────┘  └──────┬─────┘                    │
│         │                │                │                           │
│         └────────────────┴────────────────┘                          │
│                          │                                            │
│                   ┌──────▼──────┐                                    │
│                   │   Kafka     │  Topics: refdata.*.raw             │
│                   │   Topics    │  Avro Serialization                │
│                   └──────┬──────┘  Idempotent Producer              │
└──────────────────────────┼───────────────────────────────────────────┘
                           │
┌──────────────────────────▼───────────────────────────────────────────┐
│                     STORAGE LAYER (Iceberg)                          │
│                                                                       │
│  ┌────────────────────────────────────────────────────────────────┐ │
│  │  BRONZE: Raw Event Log                                         │ │
│  │  - bronze_instruments_binance (JSON, 7-day retention)          │ │
│  │  - bronze_instruments_kraken (JSON, 7-day retention)           │ │
│  │  - Partitioned by: days(ingestion_timestamp)                   │ │
│  └────────────────────────────────────────────────────────────────┘ │
│                           │                                           │
│                           │ DBT Transformations (SCD Type 2)         │
│                           ▼                                           │
│  ┌────────────────────────────────────────────────────────────────┐ │
│  │  SILVER: Bitemporal Dimensions                                 │ │
│  │  - silver_instruments (SCD Type 2 + Bitemporal)                │ │
│  │  - Partitioned by: exchange, months(valid_from)                │ │
│  │  - Columns: exchange, symbol, valid_from, valid_to,            │ │
│  │             record_created_at, tick_size, lot_size, etc.       │ │
│  └────────────────────────────────────────────────────────────────┘ │
│                           │                                           │
│                           │ DBT Symbology Mapping                    │
│                           ▼                                           │
│  ┌────────────────────────────────────────────────────────────────┐ │
│  │  GOLD: Analytics-Ready                                         │ │
│  │  - gold_symbology_master (Canonical IDs)                       │ │
│  │  - Partitioned by: truncate(base_asset, 1)                     │ │
│  │  - Cross-exchange mapping: binance_symbol, kraken_symbol, etc. │ │
│  └────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────┘
                           │
┌──────────────────────────▼───────────────────────────────────────────┐
│                      QUERY LAYER                                     │
│  ┌────────────────────────────────────────────────────────────────┐ │
│  │  FastAPI Application                                           │ │
│  │  - Point-in-time queries (bitemporal logic)                    │ │
│  │  - Symbology lookups (canonical ↔ exchange)                    │ │
│  │  - Audit trail (historical changes)                            │ │
│  │  - Health checks                                               │ │
│  └────────────────────────────────────────────────────────────────┘ │
│                           │                                           │
│  ┌────────────────────────▼───────────────────────────────────────┐ │
│  │  DuckDB Query Engine                                           │ │
│  │  - In-process OLAP engine                                      │ │
│  │  - Iceberg extension (reads directly from S3/MinIO)            │ │
│  │  - Connection pooling (5-50 connections)                       │ │
│  └────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────┘
                           │
                           ▼
                   ┌───────────────┐
                   │   API Clients │  Trading systems, analysts, etc.
                   └───────────────┘

Component Breakdown

1. Ingestion Layer

Purpose: Fetch instrument specifications from exchange APIs and publish to Kafka

Components:

  • Exchange Clients (src/refdata/ingestion/sources/)

    • REST clients with rate limiting (10 req/sec)
    • Retry logic with exponential backoff (3 attempts)
    • Response validation and parsing
    • Supported exchanges: Binance, Kraken, Bybit (future)
  • Kafka Producer (src/refdata/ingestion/producers.py)

    • Idempotent producer (enable.idempotence=true)
    • Avro serialization via Schema Registry
    • Deterministic message IDs (MD5 hash)
    • Change detection (skip publish if no changes)
  • Scheduler (src/refdata/cli/ingest.py)

    • Cron-based polling (default: hourly)
    • Manual trigger support (make ingest-now)
    • State store (PostgreSQL) tracks last successful ingestion

Data Flow:

  1. Scheduler triggers ingestion job
  2. Exchange client fetches /exchangeInfo (or equivalent)
  3. Compute response hash; compare with last ingestion
  4. If changed: Serialize to Avro, publish to Kafka
  5. Update state store with hash and timestamp

Topics:

  • refdata.instruments.binance.raw (1 partition)
  • refdata.instruments.kraken.raw (1 partition)
  • Retention: 7 days (sufficient for replay)

2. Storage Layer (Apache Iceberg)

Purpose: Durable storage with ACID properties, schema evolution, and time-travel

Bronze Layer:

  • Tables: bronze_instruments_binance, bronze_instruments_kraken
  • Schema: (ingestion_id, ingestion_timestamp, api_response_raw TEXT)
  • Partitioning: days(ingestion_timestamp) (daily partitions)
  • Format: Parquet with Zstandard compression
  • Retention: 7 days (can be replayed if Silver needs schema changes)

Silver Layer:

  • Table: silver_instruments
  • Schema: Structured columns + vendor_data JSON escape hatch
  • Partitioning: (exchange, months(valid_from))
    • Exchange filter prunes partitions (90% of queries specify exchange)
    • Monthly granularity (not daily) - reference data changes infrequently
  • SCD Type 2: valid_from, valid_to (business time)
  • Bitemporal: record_created_at, record_updated_at (system time)
  • Target partition size: 100+ MB (Iceberg best practice)

Gold Layer:

  • Table: gold_symbology_master
  • Schema: Canonical ID + exchange mappings
  • Partitioning: truncate(base_asset, 1) (first letter: B, E, S, etc.)
  • One row per canonical ID: BTC-USD-SPOT, ETH-USDT-PERP, etc.

Technology:

  • Iceberg Format Version: 2 (row-level operations, delete files)
  • Catalog: REST Catalog (http://iceberg-rest:8181)
  • Object Storage: MinIO (S3-compatible)
  • Bucket: refdata-warehouse

3. Transformation Layer (DBT)

Purpose: Transform Bronze → Silver → Gold with data quality tests

Architecture:

dbt/
├── models/
│   ├── bronze/
│   │   └── sources.yml              # Define Iceberg sources
│   ├── silver/
│   │   ├── silver_instruments.sql   # SCD Type 2 transformation
│   │   └── silver_instruments.yml   # Data quality tests
│   └── gold/
│       ├── gold_symbology_master.sql
│       └── gold_symbology_master.yml
├── macros/
│   ├── bitemporal_scd2.sql          # Custom SCD Type 2 macro
│   ├── normalize_asset.sql          # XBT→BTC, USDT→USD
│   └── normalize_instrument_type.sql
└── tests/
    ├── test_no_temporal_overlaps.sql
    └── test_every_instrument_has_current.sql

Execution:

  • Trigger: Kubernetes CronJob (hourly, after ingestion completes)
  • Engine: DuckDB with Iceberg extension
  • Incremental Models: on_schema_change='append_new_columns'
  • Artifacts: manifest.json, run_results.json, catalog.json

Data Quality Tests:

  • Built-in: unique, not_null, relationships, accepted_values
  • Custom: temporal consistency (no overlaps), no gaps (every instrument has current record)

4. Query Layer (FastAPI + DuckDB)

Purpose: RESTful API for point-in-time queries and symbology lookups

Architecture:

src/refdata/api/
├── main.py                    # FastAPI app entrypoint
├── routers/
│   ├── instruments.py         # /v1/instruments endpoints
│   ├── symbology.py           # /v1/symbology endpoints
│   └── health.py              # /v1/health
├── models.py                  # Pydantic request/response models
├── dependencies.py            # Dependency injection (DB connections)
└── middleware/
    ├── logging.py             # Request logging
    ├── correlation_id.py      # X-Correlation-ID
    └── cache_control.py       # Cache headers

Query Engine: DuckDB

  • In-process: No network hop to separate query engine
  • OLAP-optimized: Fast analytical queries (aggregations, joins)
  • Iceberg integration: Reads directly via iceberg_scan('s3://...')
  • Connection pool: 5-50 connections (based on load)

Key Endpoints:

  1. GET /v1/instruments?exchange={ex}&symbol={sym}&as_of={ts}

    • Point-in-time query (bitemporal logic)
    • Returns specifications effective at as_of timestamp
    • Handles late corrections (ORDER BY record_created_at DESC)
  2. GET /v1/instruments/{exchange}/{symbol}/history

    • Full audit trail of changes
    • Shows all valid_from/valid_to periods
    • Includes change_reason and changed_by
  3. GET /v1/symbology/{canonical_id}

    • Canonical → exchange symbol lookup
    • Returns all exchange mappings
  4. GET /v1/symbology/resolve?exchange={ex}&symbol={sym}

    • Exchange → canonical ID lookup
    • Reverse symbology mapping
  5. GET /v1/health

    • Health check (DuckDB, Iceberg catalog, Kafka connectivity)

5. Observability

Logging:

  • Framework: structlog (JSON structured logging)
  • Format: {"timestamp": "...", "level": "INFO", "message": "...", "exchange": "binance", ...}
  • Levels: DEBUG (dev only), INFO (events), WARNING (degraded), ERROR (failures), CRITICAL (system down)

Metrics (Prometheus):

# Ingestion
refdata_ingestion_success_total{exchange="binance"}
refdata_ingestion_failures_total{exchange="binance"}
refdata_ingestion_duration_seconds{exchange="binance"}
refdata_time_since_last_ingestion_seconds{exchange="binance"}

# API
refdata_api_requests_total{method="GET", endpoint="/v1/instruments", status="200"}
refdata_api_request_duration_seconds{method="GET", endpoint="/v1/instruments"}

# DBT
refdata_dbt_model_execution_duration_seconds{model="silver_instruments"}
refdata_dbt_test_failures_total{test="unique_instrument_sk"}

Alerting:

  • 3 consecutive ingestion failures → PagerDuty
  • time_since_last_ingestion > 2 * polling_interval → Warning
  • DBT test failures → Slack alert
  • API p99 latency > 1s → Warning

Dashboards (Grafana):

  • Ingestion health (last successful ingestion per exchange)
  • API performance (request rate, latency percentiles, error rate)
  • Data quality (DBT test results over time)
  • Resource usage (CPU, memory, disk I/O)

Data Model

Bronze Schema

CREATE TABLE refdata.bronze_instruments_binance (
    ingestion_id STRING NOT NULL,           -- MD5(api_response + timestamp)
    ingestion_timestamp TIMESTAMP NOT NULL,
    api_endpoint STRING NOT NULL,
    api_response_raw STRING NOT NULL,       -- Full JSON
    response_size_bytes INT,
    http_status_code INT,
    PRIMARY KEY (ingestion_id)
) USING iceberg
PARTITIONED BY (days(ingestion_timestamp));

Silver Schema

CREATE TABLE refdata.silver_instruments (
    instrument_sk STRING PRIMARY KEY,

    -- Natural key
    exchange STRING NOT NULL,
    symbol STRING NOT NULL,

    -- Bitemporal dimensions
    valid_from TIMESTAMP NOT NULL,
    valid_to TIMESTAMP,
    record_created_at TIMESTAMP NOT NULL,
    record_updated_at TIMESTAMP,

    -- Core fields
    instrument_type STRING,
    base_asset STRING,
    quote_asset STRING,
    status STRING,

    -- Specifications
    tick_size DECIMAL(38, 18),
    lot_size DECIMAL(38, 18),
    min_notional DECIMAL(38, 18),
    max_leverage INT,

    -- Perpetual-specific
    funding_interval_hours INT,
    settlement_asset STRING,

    -- Vendor data (exchange-specific fields)
    vendor_data STRING,

    -- Audit trail
    source_system STRING NOT NULL,
    change_reason STRING,
    changed_by STRING

) USING iceberg
PARTITIONED BY (exchange, months(valid_from));

Gold Schema

CREATE TABLE refdata.gold_symbology_master (
    canonical_id STRING PRIMARY KEY,
    base_asset STRING NOT NULL,
    quote_asset STRING NOT NULL,
    instrument_class STRING NOT NULL,

    -- Exchange mappings
    binance_symbol STRING,
    kraken_symbol STRING,
    bybit_symbol STRING,
    coinbase_symbol STRING,

    parent_canonical_id STRING,
    first_seen_at TIMESTAMP NOT NULL,
    last_seen_at TIMESTAMP,
    is_active BOOLEAN DEFAULT TRUE

) USING iceberg
PARTITIONED BY (truncate(base_asset, 1));

Deployment Architecture

Docker Compose (Development)

services:
  refdata-api:           # FastAPI on port 8001
  refdata-ingestion:     # Scheduled ingestion (hourly)
  refdata-dbt:           # DBT transformations (manual/scheduled)
  postgres:              # State store (shared with k2)
  minio:                 # Object storage (shared with k2)
  kafka:                 # Streaming (shared with k2)
  schema-registry:       # Avro schemas (shared with k2)
  iceberg-rest:          # Iceberg catalog (shared with k2)

Kubernetes (Production)

# API Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: refdata-api
spec:
  replicas: 3
  containers:
  - name: api
    image: k2-refdata-api:latest
    resources:
      requests:
        memory: "512Mi"
        cpu: "500m"
      limits:
        memory: "2Gi"
        cpu: "2000m"

# Ingestion CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
  name: refdata-ingestion
spec:
  schedule: "0 * * * *"  # Hourly
  jobTemplate:
    spec:
      containers:
      - name: ingest
        image: k2-refdata-api:latest
        command: ["python", "-m", "refdata.cli.ingest", "--source", "all"]

# DBT CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
  name: refdata-dbt
spec:
  schedule: "15 * * * *"  # 15 min after ingestion
  jobTemplate:
    spec:
      containers:
      - name: dbt
        image: k2-refdata-dbt:latest
        command: ["dbt", "run", "--profiles-dir", "/usr/app/dbt"]

Security

API Authentication (future):

  • API keys for external clients
  • OAuth2/JWT for internal services
  • Rate limiting per client (100 req/min)

Network Security:

  • API exposed via Kubernetes Ingress with TLS
  • Internal services (Kafka, PostgreSQL) not exposed externally
  • MinIO access via IAM roles (not long-lived credentials)

Data Sensitivity:

  • Reference data is public (no PII, no sensitive trading data)
  • API keys stored in Kubernetes Secrets
  • PostgreSQL credentials in environment variables (injected at runtime)

Scalability

Current Scale:

  • Instruments: ~10,000 (Binance: 1,500, Kraken: 500)
  • Changes per day: ~10-50 across all exchanges
  • API requests: 100-1,000 req/min (low volume)

Scaling Strategy:

Component Current Scaling Approach
Ingestion Single cron job Horizontal: Multiple workers per exchange
Kafka 1 partition/topic Vertical: Increase partitions if needed
Iceberg DuckDB (single-node) Migrate to Trino/Spark if >1M instruments
API 3 replicas Horizontal: Kubernetes HPA (CPU/memory-based)
DBT Single job Parallel: dbt run --threads 4

Performance Targets:

  • Ingestion latency: < 5 min (polling interval)
  • DBT transformation: < 5 min (hourly run)
  • API latency: p95 < 100ms, p99 < 500ms
  • Data freshness: < 1 hour (end-to-end)

Disaster Recovery

Backup Strategy:

  • Iceberg: S3 versioning enabled (restore to any snapshot)
  • PostgreSQL: Daily backups to S3 (retain 30 days)
  • Kafka: 7-day retention (can replay from Bronze)

Recovery Procedures:

  1. Bronze corruption: Replay from Kafka (if within 7 days)
  2. Silver corruption: Re-run DBT from Bronze (Bronze retained 7 days)
  3. Gold corruption: Re-run DBT from Silver
  4. API outage: Kubernetes restarts; stateless design
  5. Complete data loss: Re-ingest from exchanges (history lost)

RTO/RPO:

  • RTO (Recovery Time Objective): 1 hour
  • RPO (Recovery Point Objective): 1 hour (last successful ingestion)

Related Documentation