The K2 Reference Data Platform is a production-grade system for managing cryptocurrency instrument reference data with full temporal fidelity. It ingests specifications from multiple exchanges, transforms them through a medallion architecture, and exposes them via a RESTful API.
┌──────────────────────────────────────────────────────────────────────┐
│ INGESTION LAYER │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Binance │ │ Kraken │ │ Bybit │ REST Clients │
│ │ Client │ │ Client │ │ Client │ (Python/httpx) │
│ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ │
│ │ │ │ │
│ └────────────────┴────────────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Kafka │ Topics: refdata.*.raw │
│ │ Topics │ Avro Serialization │
│ └──────┬──────┘ Idempotent Producer │
└──────────────────────────┼───────────────────────────────────────────┘
│
┌──────────────────────────▼───────────────────────────────────────────┐
│ STORAGE LAYER (Iceberg) │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ BRONZE: Raw Event Log │ │
│ │ - bronze_instruments_binance (JSON, 7-day retention) │ │
│ │ - bronze_instruments_kraken (JSON, 7-day retention) │ │
│ │ - Partitioned by: days(ingestion_timestamp) │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ DBT Transformations (SCD Type 2) │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ SILVER: Bitemporal Dimensions │ │
│ │ - silver_instruments (SCD Type 2 + Bitemporal) │ │
│ │ - Partitioned by: exchange, months(valid_from) │ │
│ │ - Columns: exchange, symbol, valid_from, valid_to, │ │
│ │ record_created_at, tick_size, lot_size, etc. │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ DBT Symbology Mapping │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ GOLD: Analytics-Ready │ │
│ │ - gold_symbology_master (Canonical IDs) │ │
│ │ - Partitioned by: truncate(base_asset, 1) │ │
│ │ - Cross-exchange mapping: binance_symbol, kraken_symbol, etc. │ │
│ └────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────┘
│
┌──────────────────────────▼───────────────────────────────────────────┐
│ QUERY LAYER │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ FastAPI Application │ │
│ │ - Point-in-time queries (bitemporal logic) │ │
│ │ - Symbology lookups (canonical ↔ exchange) │ │
│ │ - Audit trail (historical changes) │ │
│ │ - Health checks │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────▼───────────────────────────────────────┐ │
│ │ DuckDB Query Engine │ │
│ │ - In-process OLAP engine │ │
│ │ - Iceberg extension (reads directly from S3/MinIO) │ │
│ │ - Connection pooling (5-50 connections) │ │
│ └────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────┐
│ API Clients │ Trading systems, analysts, etc.
└───────────────┘
Purpose: Fetch instrument specifications from exchange APIs and publish to Kafka
Components:
-
Exchange Clients (
src/refdata/ingestion/sources/)- REST clients with rate limiting (10 req/sec)
- Retry logic with exponential backoff (3 attempts)
- Response validation and parsing
- Supported exchanges: Binance, Kraken, Bybit (future)
-
Kafka Producer (
src/refdata/ingestion/producers.py)- Idempotent producer (enable.idempotence=true)
- Avro serialization via Schema Registry
- Deterministic message IDs (MD5 hash)
- Change detection (skip publish if no changes)
-
Scheduler (
src/refdata/cli/ingest.py)- Cron-based polling (default: hourly)
- Manual trigger support (
make ingest-now) - State store (PostgreSQL) tracks last successful ingestion
Data Flow:
- Scheduler triggers ingestion job
- Exchange client fetches
/exchangeInfo(or equivalent) - Compute response hash; compare with last ingestion
- If changed: Serialize to Avro, publish to Kafka
- Update state store with hash and timestamp
Topics:
refdata.instruments.binance.raw(1 partition)refdata.instruments.kraken.raw(1 partition)- Retention: 7 days (sufficient for replay)
Purpose: Durable storage with ACID properties, schema evolution, and time-travel
Bronze Layer:
- Tables:
bronze_instruments_binance,bronze_instruments_kraken - Schema:
(ingestion_id, ingestion_timestamp, api_response_raw TEXT) - Partitioning:
days(ingestion_timestamp)(daily partitions) - Format: Parquet with Zstandard compression
- Retention: 7 days (can be replayed if Silver needs schema changes)
Silver Layer:
- Table:
silver_instruments - Schema: Structured columns + vendor_data JSON escape hatch
- Partitioning:
(exchange, months(valid_from))- Exchange filter prunes partitions (90% of queries specify exchange)
- Monthly granularity (not daily) - reference data changes infrequently
- SCD Type 2:
valid_from,valid_to(business time) - Bitemporal:
record_created_at,record_updated_at(system time) - Target partition size: 100+ MB (Iceberg best practice)
Gold Layer:
- Table:
gold_symbology_master - Schema: Canonical ID + exchange mappings
- Partitioning:
truncate(base_asset, 1)(first letter: B, E, S, etc.) - One row per canonical ID: BTC-USD-SPOT, ETH-USDT-PERP, etc.
Technology:
- Iceberg Format Version: 2 (row-level operations, delete files)
- Catalog: REST Catalog (http://iceberg-rest:8181)
- Object Storage: MinIO (S3-compatible)
- Bucket:
refdata-warehouse
Purpose: Transform Bronze → Silver → Gold with data quality tests
Architecture:
dbt/
├── models/
│ ├── bronze/
│ │ └── sources.yml # Define Iceberg sources
│ ├── silver/
│ │ ├── silver_instruments.sql # SCD Type 2 transformation
│ │ └── silver_instruments.yml # Data quality tests
│ └── gold/
│ ├── gold_symbology_master.sql
│ └── gold_symbology_master.yml
├── macros/
│ ├── bitemporal_scd2.sql # Custom SCD Type 2 macro
│ ├── normalize_asset.sql # XBT→BTC, USDT→USD
│ └── normalize_instrument_type.sql
└── tests/
├── test_no_temporal_overlaps.sql
└── test_every_instrument_has_current.sql
Execution:
- Trigger: Kubernetes CronJob (hourly, after ingestion completes)
- Engine: DuckDB with Iceberg extension
- Incremental Models:
on_schema_change='append_new_columns' - Artifacts:
manifest.json,run_results.json,catalog.json
Data Quality Tests:
- Built-in:
unique,not_null,relationships,accepted_values - Custom: temporal consistency (no overlaps), no gaps (every instrument has current record)
Purpose: RESTful API for point-in-time queries and symbology lookups
Architecture:
src/refdata/api/
├── main.py # FastAPI app entrypoint
├── routers/
│ ├── instruments.py # /v1/instruments endpoints
│ ├── symbology.py # /v1/symbology endpoints
│ └── health.py # /v1/health
├── models.py # Pydantic request/response models
├── dependencies.py # Dependency injection (DB connections)
└── middleware/
├── logging.py # Request logging
├── correlation_id.py # X-Correlation-ID
└── cache_control.py # Cache headers
Query Engine: DuckDB
- In-process: No network hop to separate query engine
- OLAP-optimized: Fast analytical queries (aggregations, joins)
- Iceberg integration: Reads directly via
iceberg_scan('s3://...') - Connection pool: 5-50 connections (based on load)
Key Endpoints:
-
GET /v1/instruments?exchange={ex}&symbol={sym}&as_of={ts}- Point-in-time query (bitemporal logic)
- Returns specifications effective at
as_oftimestamp - Handles late corrections (ORDER BY record_created_at DESC)
-
GET /v1/instruments/{exchange}/{symbol}/history- Full audit trail of changes
- Shows all valid_from/valid_to periods
- Includes change_reason and changed_by
-
GET /v1/symbology/{canonical_id}- Canonical → exchange symbol lookup
- Returns all exchange mappings
-
GET /v1/symbology/resolve?exchange={ex}&symbol={sym}- Exchange → canonical ID lookup
- Reverse symbology mapping
-
GET /v1/health- Health check (DuckDB, Iceberg catalog, Kafka connectivity)
Logging:
- Framework: structlog (JSON structured logging)
- Format:
{"timestamp": "...", "level": "INFO", "message": "...", "exchange": "binance", ...} - Levels: DEBUG (dev only), INFO (events), WARNING (degraded), ERROR (failures), CRITICAL (system down)
Metrics (Prometheus):
# Ingestion
refdata_ingestion_success_total{exchange="binance"}
refdata_ingestion_failures_total{exchange="binance"}
refdata_ingestion_duration_seconds{exchange="binance"}
refdata_time_since_last_ingestion_seconds{exchange="binance"}
# API
refdata_api_requests_total{method="GET", endpoint="/v1/instruments", status="200"}
refdata_api_request_duration_seconds{method="GET", endpoint="/v1/instruments"}
# DBT
refdata_dbt_model_execution_duration_seconds{model="silver_instruments"}
refdata_dbt_test_failures_total{test="unique_instrument_sk"}
Alerting:
- 3 consecutive ingestion failures → PagerDuty
time_since_last_ingestion > 2 * polling_interval→ Warning- DBT test failures → Slack alert
- API p99 latency > 1s → Warning
Dashboards (Grafana):
- Ingestion health (last successful ingestion per exchange)
- API performance (request rate, latency percentiles, error rate)
- Data quality (DBT test results over time)
- Resource usage (CPU, memory, disk I/O)
CREATE TABLE refdata.bronze_instruments_binance (
ingestion_id STRING NOT NULL, -- MD5(api_response + timestamp)
ingestion_timestamp TIMESTAMP NOT NULL,
api_endpoint STRING NOT NULL,
api_response_raw STRING NOT NULL, -- Full JSON
response_size_bytes INT,
http_status_code INT,
PRIMARY KEY (ingestion_id)
) USING iceberg
PARTITIONED BY (days(ingestion_timestamp));CREATE TABLE refdata.silver_instruments (
instrument_sk STRING PRIMARY KEY,
-- Natural key
exchange STRING NOT NULL,
symbol STRING NOT NULL,
-- Bitemporal dimensions
valid_from TIMESTAMP NOT NULL,
valid_to TIMESTAMP,
record_created_at TIMESTAMP NOT NULL,
record_updated_at TIMESTAMP,
-- Core fields
instrument_type STRING,
base_asset STRING,
quote_asset STRING,
status STRING,
-- Specifications
tick_size DECIMAL(38, 18),
lot_size DECIMAL(38, 18),
min_notional DECIMAL(38, 18),
max_leverage INT,
-- Perpetual-specific
funding_interval_hours INT,
settlement_asset STRING,
-- Vendor data (exchange-specific fields)
vendor_data STRING,
-- Audit trail
source_system STRING NOT NULL,
change_reason STRING,
changed_by STRING
) USING iceberg
PARTITIONED BY (exchange, months(valid_from));CREATE TABLE refdata.gold_symbology_master (
canonical_id STRING PRIMARY KEY,
base_asset STRING NOT NULL,
quote_asset STRING NOT NULL,
instrument_class STRING NOT NULL,
-- Exchange mappings
binance_symbol STRING,
kraken_symbol STRING,
bybit_symbol STRING,
coinbase_symbol STRING,
parent_canonical_id STRING,
first_seen_at TIMESTAMP NOT NULL,
last_seen_at TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
) USING iceberg
PARTITIONED BY (truncate(base_asset, 1));services:
refdata-api: # FastAPI on port 8001
refdata-ingestion: # Scheduled ingestion (hourly)
refdata-dbt: # DBT transformations (manual/scheduled)
postgres: # State store (shared with k2)
minio: # Object storage (shared with k2)
kafka: # Streaming (shared with k2)
schema-registry: # Avro schemas (shared with k2)
iceberg-rest: # Iceberg catalog (shared with k2)# API Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: refdata-api
spec:
replicas: 3
containers:
- name: api
image: k2-refdata-api:latest
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
# Ingestion CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: refdata-ingestion
spec:
schedule: "0 * * * *" # Hourly
jobTemplate:
spec:
containers:
- name: ingest
image: k2-refdata-api:latest
command: ["python", "-m", "refdata.cli.ingest", "--source", "all"]
# DBT CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: refdata-dbt
spec:
schedule: "15 * * * *" # 15 min after ingestion
jobTemplate:
spec:
containers:
- name: dbt
image: k2-refdata-dbt:latest
command: ["dbt", "run", "--profiles-dir", "/usr/app/dbt"]API Authentication (future):
- API keys for external clients
- OAuth2/JWT for internal services
- Rate limiting per client (100 req/min)
Network Security:
- API exposed via Kubernetes Ingress with TLS
- Internal services (Kafka, PostgreSQL) not exposed externally
- MinIO access via IAM roles (not long-lived credentials)
Data Sensitivity:
- Reference data is public (no PII, no sensitive trading data)
- API keys stored in Kubernetes Secrets
- PostgreSQL credentials in environment variables (injected at runtime)
Current Scale:
- Instruments: ~10,000 (Binance: 1,500, Kraken: 500)
- Changes per day: ~10-50 across all exchanges
- API requests: 100-1,000 req/min (low volume)
Scaling Strategy:
| Component | Current | Scaling Approach |
|---|---|---|
| Ingestion | Single cron job | Horizontal: Multiple workers per exchange |
| Kafka | 1 partition/topic | Vertical: Increase partitions if needed |
| Iceberg | DuckDB (single-node) | Migrate to Trino/Spark if >1M instruments |
| API | 3 replicas | Horizontal: Kubernetes HPA (CPU/memory-based) |
| DBT | Single job | Parallel: dbt run --threads 4 |
Performance Targets:
- Ingestion latency: < 5 min (polling interval)
- DBT transformation: < 5 min (hourly run)
- API latency: p95 < 100ms, p99 < 500ms
- Data freshness: < 1 hour (end-to-end)
Backup Strategy:
- Iceberg: S3 versioning enabled (restore to any snapshot)
- PostgreSQL: Daily backups to S3 (retain 30 days)
- Kafka: 7-day retention (can replay from Bronze)
Recovery Procedures:
- Bronze corruption: Replay from Kafka (if within 7 days)
- Silver corruption: Re-run DBT from Bronze (Bronze retained 7 days)
- Gold corruption: Re-run DBT from Silver
- API outage: Kubernetes restarts; stateless design
- Complete data loss: Re-ingest from exchanges (history lost)
RTO/RPO:
- RTO (Recovery Time Objective): 1 hour
- RPO (Recovery Point Objective): 1 hour (last successful ingestion)
- ADR-001: Bitemporal Modeling
- ADR-002: Ingestion Strategy
- ADR-003: DBT vs Spark
- ADR-004: Symbology Mapping
- ADR-005: Schema Evolution
- SCHEMA.md - Detailed schema definitions and ERD
- SETUP.md - Local development setup
- TESTING.md - Testing strategy and guidelines