| layout | default |
|---|---|
| title | Chapter 3: Data Ingestion & ETL |
| parent | ClickHouse Tutorial |
| nav_order | 3 |
Welcome to Chapter 3: Data Ingestion & ETL. In this part of ClickHouse Tutorial: High-Performance Analytical Database, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Now that you have well-designed schemas, let's explore how to efficiently load data into ClickHouse. Data ingestion is where ClickHouse truly shines, offering multiple methods for getting data into your analytical database quickly and reliably.
-- Single row insert
INSERT INTO events VALUES (1, '2024-01-01 10:00:00', 'user_login', 123);
-- Multiple rows
INSERT INTO events VALUES
(1, '2024-01-01 10:00:00', 'user_login', 123),
(2, '2024-01-01 10:01:00', 'page_view', 456),
(3, '2024-01-01 10:02:00', 'purchase', 789);
-- Insert with column names
INSERT INTO events (id, timestamp, event_type, user_id)
VALUES (4, now(), 'signup', 101);# Load data from CSV file
clickhouse-client --query="
INSERT INTO events
FORMAT CSV
" < events.csv
# Load from TSV with headers
clickhouse-client --query="
INSERT INTO events
FORMAT TSVWithNames
" < events_with_headers.tsv# Connect to ClickHouse
clickhouse-client --host localhost --port 9000 --user default
# Load data with progress
cat large_dataset.csv | clickhouse-client --query="
INSERT INTO table_name FORMAT CSV
" --progress
# Load compressed data
zcat data.csv.gz | clickhouse-client --query="
INSERT INTO table_name FORMAT CSV
"-- Create external table for CSV processing
CREATE TABLE events_raw (
timestamp String,
user_id UInt32,
event_type String,
value Float64
) ENGINE = File(CSV, '/data/events.csv');
-- Process and insert into final table
INSERT INTO events
SELECT
parseDateTimeBestEffort(timestamp) as timestamp,
user_id,
event_type,
value
FROM events_raw
WHERE user_id IS NOT NULL;-- Load data directly from URL
INSERT INTO events
SELECT *
FROM url('https://example.com/data.csv', CSV, '
timestamp DateTime,
user_id UInt32,
event_type String,
value Float64
');-- Create Kafka engine table
CREATE TABLE events_kafka (
timestamp DateTime,
user_id UInt32,
event_type String,
value Float64
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'user_events',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow';
-- Create materialized view for processing
CREATE MATERIALIZED VIEW events_processed
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, user_id)
AS SELECT
timestamp,
user_id,
event_type,
value,
now() as processed_at
FROM events_kafka;-- RabbitMQ engine table
CREATE TABLE events_rabbitmq (
timestamp DateTime,
user_id UInt32,
event_type String,
value Float64
) ENGINE = RabbitMQ()
SETTINGS
rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'events_exchange',
rabbitmq_format = 'JSONEachRow',
rabbitmq_routing_key_list = 'user.*';-- Transform data during ingestion
INSERT INTO user_activity_clean
SELECT
user_id,
timestamp,
event_type,
CASE
WHEN event_type = 'login' THEN 1
WHEN event_type = 'logout' THEN 2
ELSE 0
END as event_code,
JSONExtractString(properties, 'page') as page,
JSONExtractFloat(properties, 'duration') as duration
FROM user_activity_raw
WHERE timestamp >= '2024-01-01 00:00:00';-- Create external dictionary for lookups
CREATE DICTIONARY user_attributes (
user_id UInt32,
country LowCardinality(String),
age_group LowCardinality(String),
subscription_type LowCardinality(String)
)
PRIMARY KEY user_id
SOURCE(HTTP(URL 'http://api.example.com/users' FORMAT 'JSON'))
LIFETIME(MIN 300 MAX 3600)
LAYOUT(HASHED());
-- Use dictionary in transformations
INSERT INTO enriched_events
SELECT
e.timestamp,
e.user_id,
e.event_type,
e.value,
dictGet('user_attributes', 'country', e.user_id) as country,
dictGet('user_attributes', 'subscription_type', e.user_id) as subscription_type
FROM events e;-- Track last processed timestamp
CREATE TABLE ingestion_state (
table_name String,
last_timestamp DateTime,
last_id UInt64
) ENGINE = MergeTree()
ORDER BY table_name;
-- Incremental load with state tracking
INSERT INTO events
SELECT
timestamp,
user_id,
event_type,
value
FROM events_source
WHERE timestamp > (
SELECT last_timestamp
FROM ingestion_state
WHERE table_name = 'events'
);
-- Update state
INSERT INTO ingestion_state
SELECT
'events' as table_name,
max(timestamp) as last_timestamp,
max(id) as last_id
FROM events_source;-- Use watermarks for incremental processing
CREATE TABLE events_watermark (
watermark DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY watermark;
-- Incremental load
INSERT INTO events
SELECT *
FROM events_source
WHERE timestamp > (SELECT max(watermark) FROM events_watermark);
-- Update watermark
INSERT INTO events_watermark SELECT now();-- Validate data before insertion
CREATE TABLE events_staging (
timestamp String,
user_id String,
event_type String,
value String
) ENGINE = MergeTree()
ORDER BY tuple();
-- Validation query
SELECT
count() as total_rows,
countIf(length(timestamp) > 0) as valid_timestamps,
countIf(user_id != '') as valid_user_ids,
countIf(event_type IN ('login', 'logout', 'purchase')) as valid_event_types
FROM events_staging;
-- Insert only valid data
INSERT INTO events
SELECT
parseDateTimeBestEffort(timestamp),
toUInt32OrNull(user_id),
event_type,
toFloat64OrNull(value)
FROM events_staging
WHERE
length(timestamp) > 0
AND user_id != ''
AND event_type IN ('login', 'logout', 'purchase');-- Data quality checks
SELECT
'Duplicate events' as check_name,
count() as count
FROM (
SELECT user_id, timestamp, event_type, count() as cnt
FROM events
GROUP BY user_id, timestamp, event_type
HAVING cnt > 1
);
-- Missing data validation
SELECT
'Missing timestamps' as check_name,
countIf(timestamp IS NULL) as count
FROM events;
-- Outlier detection
SELECT
'Value outliers' as check_name,
countIf(value < 0 OR value > 1000000) as count
FROM events;# Parallel loading with multiple clients
for i in {1..4}; do
clickhouse-client --query="
INSERT INTO events
SELECT * FROM events_source
WHERE cityHash64(id) % 4 = $i
" &
done
wait-- Optimize batch sizes
INSERT INTO events
SELECT *
FROM events_source
SETTINGS
max_insert_threads = 8,
max_insert_block_size = 1000000,
min_insert_block_size_rows = 100000;-- Control memory usage during ingestion
INSERT INTO large_table
SELECT *
FROM source_table
SETTINGS
max_memory_usage = 10000000000, -- 10GB
max_threads = 4,
max_insert_threads = 4;-- Monitor ingestion performance
SELECT
database,
table,
metric,
value
FROM system.metrics
WHERE metric LIKE '%insert%'
OR metric LIKE '%merge%';
-- Query ingestion statistics
SELECT
query_id,
query,
read_rows,
written_rows,
memory_usage,
query_duration_ms
FROM system.query_log
WHERE query LIKE '%INSERT%'
AND type = 'QueryFinish'
ORDER BY query_duration_ms DESC
LIMIT 10;-- Identify slow insertions
SELECT
query,
query_duration_ms,
read_bytes,
written_bytes,
memory_usage
FROM system.query_log
WHERE query LIKE '%INSERT%'
AND query_duration_ms > 5000 -- > 5 seconds
ORDER BY query_duration_ms DESC;
-- Monitor merge performance
SELECT
database,
table,
elapsed,
progress,
num_parts,
result_part_name
FROM system.merges
WHERE is_mutation = 0
ORDER BY elapsed DESC;-- Fact table loading
INSERT INTO fact_sales
SELECT
s.sale_id,
d.date_key,
p.product_key,
c.customer_key,
s.quantity,
s.unit_price,
s.total_amount,
s.discount_amount
FROM staging_sales s
JOIN dim_date d ON s.sale_date = d.full_date
JOIN dim_product p ON s.product_id = p.product_id
JOIN dim_customer c ON s.customer_id = c.customer_id;
-- Dimension updates
INSERT INTO dim_customer
SELECT
customer_id,
customer_name,
email,
phone,
address,
now() as updated_at
FROM staging_customers
WHERE customer_id NOT IN (SELECT customer_id FROM dim_customer);-- Parse and enrich log data
INSERT INTO processed_logs
SELECT
timestamp,
parseDateTimeBestEffort(timestamp_str) as parsed_timestamp,
level,
message,
JSONExtractString(metadata, 'user_id') as user_id,
JSONExtractString(metadata, 'session_id') as session_id,
arrayMap(x -> JSONExtractString(x, 'tag'), JSONExtractArrayRaw(metadata, 'tags')) as tags,
cityHash64(concat(user_id, session_id)) as session_hash
FROM raw_logs
WHERE length(timestamp_str) > 0
AND level IN ('INFO', 'WARN', 'ERROR');Excellent! 🎉 You've mastered ClickHouse data ingestion:
- Bulk Loading - Efficiently loading large datasets
- Streaming Ingestion - Real-time data from Kafka/RabbitMQ
- ETL Pipelines - Data transformation and enrichment
- Incremental Loading - Change data capture and watermarks
- Data Quality - Validation and error handling
- Performance Optimization - Parallel loading and memory management
- Monitoring - Tracking ingestion performance and troubleshooting
With data flowing into ClickHouse efficiently, let's focus on writing high-performance analytical queries. In Chapter 4: Query Optimization, we'll explore ClickHouse's query engine and optimization techniques.
Practice what you've learned:
- Set up a Kafka consumer for real-time data ingestion
- Create an ETL pipeline with data validation
- Implement incremental loading for a large dataset
- Monitor and optimize the performance of your ingestion pipeline
What type of data source are you planning to ingest into ClickHouse? 📥
Generated by AI Codebase Knowledge Builder
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for SELECT, timestamp, user_id so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 3: Data Ingestion & ETL as an operating subsystem inside ClickHouse Tutorial: High-Performance Analytical Database, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around INSERT, INTO, events as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 3: Data Ingestion & ETL usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
SELECT. - Input normalization: shape incoming data so
timestampreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
user_id. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- View Repo
Why it matters: authoritative reference on
View Repo(github.com).
Suggested trace strategy:
- search upstream code for
SELECTandtimestampto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production