Skip to content

Latest commit

 

History

History
475 lines (368 loc) · 13.5 KB

File metadata and controls

475 lines (368 loc) · 13.5 KB

ClickHouse-Specific Patterns

This guide covers ClickHouse features that CH.Toolkit supports through its schema modeling and DDL compilation.

Engine Selection

MergeTree Family

The MergeTree family is the core of ClickHouse's storage. All engines require ORDER BY.

// Basic MergeTree -- general purpose
.MergeTree()

// ReplacingMergeTree -- deduplicates by ORDER BY key
// Optional version column: keeps row with highest version
.ReplacingMergeTree()
.ReplacingMergeTree(e => e.Version)

// SummingMergeTree -- auto-sums numeric columns during merges
.SummingMergeTree()
.SummingMergeTree(e => e.Amount, e => e.Count)

// AggregatingMergeTree -- for materialized views with -State/-Merge aggregates
.AggregatingMergeTree()

// CollapsingMergeTree -- collapses rows using sign column (+1/-1)
.CollapsingMergeTree(e => e.Sign)

// VersionedCollapsingMergeTree -- collapsing with version for out-of-order inserts
.VersionedCollapsingMergeTree(e => e.Sign, e => e.Version)

// GraphiteMergeTree -- for Graphite metrics storage
.GraphiteMergeTree("graphite_rollup")

Replicated Engines

Add replication by prefixing with Replicated. Parameters are optional when using {database} and {table} macros.

.ReplicatedMergeTree()
.ReplicatedMergeTree("/clickhouse/tables/{shard}/{database}/{table}", "{replica}")
.ReplicatedReplacingMergeTree(versionColumn: e => e.Version)
.ReplicatedSummingMergeTree()
.ReplicatedAggregatingMergeTree()
.ReplicatedCollapsingMergeTree(signColumn: e => e.Sign)
.ReplicatedVersionedCollapsingMergeTree(signColumn: e => e.Sign, versionColumn: e => e.Version)
.ReplicatedGraphiteMergeTree(configSection: "graphite_rollup")

// All replicated variants accept optional cluster parameter
.ReplicatedMergeTree(cluster: "my_cluster")

Log Family

Simple append-only engines for small tables (up to ~1M rows). No indexes, no replication.

.Log()          // Concurrent reads, blocking writes
.StripeLog()    // Similar to Log, single file storage
.TinyLog()      // Simplest, no concurrent reads

Special Engines

.Memory()                        // In-memory, lost on restart
.Null()                          // /dev/null -- accepts writes, returns nothing
.SetEngine()                     // In-memory set for IN subqueries
.EmbeddedRocksDB()               // RocksDB-backed key-value storage
.EmbeddedRocksDB(ttl: 3600)      // With TTL in seconds

Utility Engines

// Buffer -- batches inserts before flushing to target table
.Buffer("analytics", "events", numLayers: 16,
    minTime: 10, maxTime: 100,
    minRows: 10000, maxRows: 1000000,
    minBytes: 10000000, maxBytes: 100000000)

// Distributed -- sharded reads/writes across a cluster
.Distributed("my_cluster", "analytics", "events")
.Distributed("my_cluster", "analytics", "events", shardingKey: "sipHash64(user_id)")

// Merge -- unions tables matching a regex pattern
.Merge("analytics", "^events_\\d{4}$")

// File -- read/write files in various formats
.File("CSV")
.File("Parquet")

// Join -- in-memory hash table for JOIN operations
.Join("ANY", "LEFT", "user_id")

// URL -- external data via HTTP
.Url("https://example.com/data.csv", "CSV")

// GenerateRandom -- test data generator
.GenerateRandom()
.GenerateRandom(randomSeed: 42, maxStringLength: 100)

// KeeperMap -- distributed key-value using ClickHouse Keeper
.KeeperMap("/keeper/path")
.KeeperMap("/keeper/path", keysLimit: 1000)

Custom Engine

For engines not covered by the fluent API:

// Structured (name + args)
.Engine("MaterializedPostgreSQL", "'host:5432'", "'db'", "'table'")

// Raw expression
.Engine("ReplicatedMergeTree('/custom/path', '{replica}')")

Materialized Views

Materialized views run a query on every insert to the source table and store results.

Basic View (with separate target table)

var schema = new SchemaBuilder()
    .Table<Event>("events")
        .MergeTree()
        .OrderBy(e => e.Timestamp)
    .MaterializedView("hourly_events")
        .SelectQuery(@"
            SELECT
                toStartOfHour(timestamp) AS hour,
                event_type,
                count() AS cnt
            FROM events
            GROUP BY hour, event_type")
        .ToTable("hourly_events_target")
        .SummingMergeTree()
        .OrderBy("hour", "event_type")
    .Build("analytics");

View with Inline Engine (no separate target table)

.MaterializedView("daily_stats")
    .SelectQuery("SELECT toDate(timestamp) AS day, count() AS cnt FROM events GROUP BY day")
    .MergeTree()
    .OrderBy("day")
    .Build("analytics");

View with POPULATE

POPULATE fills the view with existing data at creation time.

.MaterializedView("backfilled_view")
    .SelectQuery("SELECT ...")
    .MergeTree()
    .OrderBy("day")
    .Populate()
    .Build("analytics");

View with Typed Query Builder

.MaterializedView("typed_view")
    .Select<Event>(q => q
        .GroupBy(e => e.EventType)
        .Select(g => new { EventType = g.Key, Count = g.Count() }))
    .MergeTree()
    .OrderBy("event_type")
    .Build("analytics");

View with Fluent Query Builder

.MaterializedView("fluent_view")
    .Select(q => q
        .From("events")
        .Select("toStartOfHour(timestamp) AS hour", "count() AS cnt")
        .GroupBy("hour"))
    .MergeTree()
    .OrderBy("hour")
    .Build("analytics");

External Dictionaries

Dictionaries provide key-value lookups that ClickHouse can use in queries via dictGet().

ClickHouse Source

var schema = new SchemaBuilder()
    .Dictionary("user_names")
        .PrimaryKey("user_id")
        .Attribute("name", "String")
        .Attribute("email", "String", defaultExpression: "''")
        .SourceClickHouse("reference_db", "users",
            host: "ch-node-1", port: "9000",
            user: "readonly", password: "secret")
        .Hashed()
        .Lifetime(60, 300)
    .Build("analytics");

Layout Types

.Flat()                              // FLAT -- simple array, fastest for small datasets
.Hashed()                            // HASHED -- hash table, good for most use cases
.Cache(sizeInCells: 50000)           // CACHE(SIZE_IN_CELLS 50000) -- LRU cache
.ComplexKeyHashed()                  // COMPLEX_KEY_HASHED -- composite key support

For other layouts, use the generic method:

.Layout("RANGE_HASHED", new Dictionary<string, string>
{
    ["range_min"] = "start_date",
    ["range_max"] = "end_date"
})

Custom Source

.Source("HTTP", new Dictionary<string, string>
{
    ["url"] = "https://api.example.com/data",
    ["format"] = "JSONEachRow"
})

Dictionary Attributes

.Attribute("name", "String")
.Attribute("region", "String", defaultExpression: "'unknown'")
.Attribute("parent_id", "UInt64", isHierarchical: true)    // For hierarchical dictionaries
.Attribute("code", "String", isInjective: true)            // 1-to-1 mapping optimization

Using Dictionaries in Queries

// LINQ
.Select(e => new
{
    e.UserId,
    UserName = Ch.DictGet<string>("user_names", "name", e.UserId)
})

// Check existence
.Where(e => Ch.DictHas("user_names", e.UserId))

// With default fallback
.Select(e => Ch.DictGetOrDefault<string>("user_names", "name", e.UserId, "unknown"))

ON CLUSTER DDL

For distributed setups, DDL operations can be replicated across cluster nodes.

Global Cluster Setting

Apply to all DDL via DdlOptions:

var compiler = new DdlCompiler(new DdlOptions { ClusterName = "production" });
// All compiled DDL includes ON CLUSTER 'production'

Via MigrationEngineOptions

var engine = new MigrationEngine(new MigrationEngineOptions
{
    ConnectionString = "Host=localhost;Port=9000",
    Database = "analytics",
    ClusterName = "production"    // Applied to all migration DDL
});

Per-Table Override

Override the global cluster on specific tables via the schema model:

// At the TableSchema level
new TableSchema("events", columns, engine, OnCluster: "special_cluster")

Generated SQL:

CREATE TABLE IF NOT EXISTS analytics.events ON CLUSTER 'production' (...)
ALTER TABLE analytics.events ON CLUSTER 'production' ...
DROP TABLE IF EXISTS analytics.events ON CLUSTER 'production'
CREATE MATERIALIZED VIEW ... ON CLUSTER 'production' AS ...
CREATE DICTIONARY ... ON CLUSTER 'production' (...)

ReplacingMergeTree with Version Columns

ReplacingMergeTree deduplicates rows with the same ORDER BY key during background merges. A version column controls which row survives.

var schema = new SchemaBuilder()
    .Table<User>("users")
        .ReplacingMergeTree(u => u.Version)    // Keep row with highest version
        .OrderBy(u => u.UserId)
    .Build("analytics");

To get deduplicated results at query time (before merges complete), use FINAL:

var sql = Query.From<User>()
    .Final()
    .Where(u => u.UserId == 42)
    .ToSql();
// SELECT * FROM users FINAL WHERE (user_id = 42)

TTL Configuration

Table-Level TTL

// Delete rows after 90 days
.Ttl("created_at + INTERVAL 90 DAY", "DELETE")

// Move to cold storage after 30 days
.Ttl("created_at + INTERVAL 30 DAY", "TO DISK 'cold'")

// Move to different volume
.Ttl("created_at + INTERVAL 7 DAY", "TO VOLUME 'archive'")

Column-Level TTL

.Column(e => e.DetailedPayload)
    .Ttl("timestamp + INTERVAL 365 DAY")
    .Table

Multiple TTL Rules

Multiple TTL rules can be applied to a table via the schema model:

new EngineSchema("MergeTree",
    OrderBy: ["timestamp"],
    Ttl: [
        new TtlClause("timestamp + INTERVAL 30 DAY", "TO DISK 'warm'"),
        new TtlClause("timestamp + INTERVAL 90 DAY", "TO DISK 'cold'"),
        new TtlClause("timestamp + INTERVAL 365 DAY", "DELETE")
    ])

Skipping Indexes

Skipping indexes help ClickHouse skip granules that definitely don't match a query condition.

var schema = new SchemaBuilder()
    .Table<Event>("events")
        .MergeTree()
        .OrderBy(e => e.Timestamp)
        .Index("idx_user", "user_id", "minmax", 4)
        .Index("idx_payload", "payload", "tokenbf_v1(10240, 3, 0)", 2)
        .Index("idx_status", "status", "set(100)", 4)
        .Index("idx_tags", "tags", "bloom_filter(0.01)", 3)
    .Build("analytics");

Generated SQL:

INDEX idx_user user_id TYPE minmax GRANULARITY 4,
INDEX idx_payload payload TYPE tokenbf_v1(10240, 3, 0) GRANULARITY 2,
INDEX idx_status status TYPE set(100) GRANULARITY 4,
INDEX idx_tags tags TYPE bloom_filter(0.01) GRANULARITY 3

Common index types:

  • minmax -- stores min/max per granule, good for range queries
  • set(max_rows) -- stores unique values, good for equality checks
  • bloom_filter(false_positive_rate) -- probabilistic, good for strings/arrays
  • tokenbf_v1(size, hashes, seed) -- tokenized bloom filter for full-text-like searches

Projections

Projections store pre-sorted/pre-aggregated subsets of data for faster queries.

.Projection("proj_by_user",
    "SELECT user_id, count() GROUP BY user_id")

.Projection("proj_daily",
    "SELECT toDate(timestamp) AS day, sum(value) GROUP BY day ORDER BY day")

ClickHouse automatically uses projections when they can satisfy a query more efficiently.

Compression Codecs

Specify compression per column for storage optimization:

.Column(e => e.Payload).Codec("ZSTD(3)").Table          // High compression
.Column(e => e.Timestamp).Codec("DoubleDelta, LZ4").Table // Time series optimization
.Column(e => e.Value).Codec("Gorilla, ZSTD(1)").Table    // Float optimization
.Column(e => e.Id).Codec("Delta(4), ZSTD").Table         // Monotonic integers

Common codecs:

  • LZ4 -- fast, moderate compression (default)
  • ZSTD(level) -- higher compression, slower (level 1-22)
  • DoubleDelta -- for monotonically increasing sequences (timestamps, counters)
  • Gorilla -- for floating-point values with small changes
  • Delta(bytes) -- stores differences between consecutive values
  • T64 -- for integer types, transposes bits
  • Codecs can be chained: DoubleDelta, LZ4 applies DoubleDelta first, then LZ4

Complete Example

Putting it all together:

var schema = new SchemaBuilder()
    .Table<Event>("events")
        .ReplicatedMergeTree(cluster: "production")
        .OrderBy(e => e.EventType, e => e.Timestamp, e => e.UserId)
        .PartitionByMonth(e => e.Timestamp)
        .SampleBy(e => e.UserId)
        .Column(e => e.EventType).LowCardinalityString().Table
        .Column(e => e.Payload).Codec("ZSTD(3)").Table
        .Column(e => e.Timestamp).Codec("DoubleDelta, LZ4").Comment("Event time").Table
        .Ttl("timestamp + INTERVAL 365 DAY", "DELETE")
        .Index("idx_user", "user_id", "minmax", 4)
        .Projection("proj_hourly",
            "SELECT toStartOfHour(timestamp) AS hour, event_type, count() GROUP BY hour, event_type")
        .Setting("index_granularity", "8192")
        .Comment("Main event stream")
    .MaterializedView("hourly_events")
        .SelectQuery(@"
            SELECT toStartOfHour(timestamp) AS hour,
                   event_type,
                   count() AS cnt,
                   uniqState(user_id) AS users
            FROM events
            GROUP BY hour, event_type")
        .ReplicatedAggregatingMergeTree(cluster: "production")
        .OrderBy("hour", "event_type")
    .Dictionary("event_labels")
        .PrimaryKey("event_type")
        .Attribute("label", "String")
        .Attribute("category", "String")
        .SourceClickHouse("reference", "event_metadata")
        .Hashed()
        .Lifetime(300, 600)
    .Build("analytics");