Skip to content

Latest commit

 

History

History
403 lines (311 loc) · 11.7 KB

File metadata and controls

403 lines (311 loc) · 11.7 KB

Migration Workflow

CH.Toolkit provides auto-diffing migrations with safety policies, distributed locking, checksum validation, and lifecycle hooks.

End-to-End Walkthrough

1. Define your model

public class Event
{
    public DateTime Timestamp { get; set; }
    public string UserId { get; set; } = "";
    public string EventType { get; set; } = "";
    public int Value { get; set; }
}

2. Build the desired schema

var schema = new SchemaBuilder()
    .Table<Event>()
        .MergeTree()
        .OrderBy(e => e.Timestamp, e => e.UserId)
        .PartitionByMonth(e => e.Timestamp)
        .Column(e => e.EventType).LowCardinalityString().Table
    .Build("analytics");

3. Diff against the live database

var engine = new MigrationEngine(new MigrationEngineOptions
{
    ConnectionString = "Host=localhost;Port=9000",
    Database = "analytics"
});

var operations = await engine.DiffAsync(schema);
// Returns: [CreateTableOp(event), ...]

4. Generate migration files

var generated = engine.GenerateMigration("InitialCreate", operations);
// Writes C# migration classes to disk

Or use the CLI:

chsharp add-migration --name InitialCreate

5. Apply migrations

await engine.MigrateAsync(migrationsAssembly);

Or use the CLI:

chsharp migrate

MigrationBase

All migrations inherit from MigrationBase:

public abstract class MigrationBase
{
    public abstract string Id { get; }                          // Unique ID (typically timestamp)
    public abstract string Name { get; }                        // Human-readable name
    public abstract void Up(MigrationBuilder builder);          // Forward operations
    public abstract void Down(MigrationBuilder builder);        // Reverse operations (for documentation)
}

The framework calls Up() during forward migration. Down() is not called automatically -- it exists for documentation and manual rollback scenarios.

MigrationBuilder API

Table Operations

builder.CreateTable("events", table =>
{
    table.Column("timestamp", "DateTime");
    table.Column("user_id", "String");
    table.Column("value", "Int32", c => c.Default("0").SetCodec("ZSTD(3)"));
    table.Engine("MergeTree");
    table.OrderBy("timestamp", "user_id");
    table.PartitionBy("toYYYYMM(timestamp)");
});

builder.DropTable("old_events");

Column Operations

builder.AddColumn("events", "category", "LowCardinality(String)",
    c => c.Default("'unknown'").SetComment("Event category").SetCodec("ZSTD"));

builder.DropColumn("events", "old_field");
builder.AlterColumn("events", "value", "Int64");
builder.RenameColumn("events", "user_id", "uid");

Index Operations

builder.AddIndex("events", "idx_user", "user_id", "minmax", 4);
builder.DropIndex("events", "idx_user");

Projection Operations

builder.AddProjection("events", "proj_by_user",
    "SELECT user_id, count() GROUP BY user_id");
builder.DropProjection("events", "proj_by_user");

Materialized View Operations

builder.CreateMaterializedView("hourly_events",
    "SELECT toStartOfHour(timestamp) AS hour, count() AS cnt FROM events GROUP BY hour",
    mv =>
    {
        mv.Engine("SummingMergeTree");
        mv.OrderBy("hour");
    });

builder.DropMaterializedView("hourly_events");

Dictionary Operations

builder.CreateDictionary("user_names", dict =>
{
    dict.PrimaryKey("user_id");
    dict.Attribute("name", "String");
    dict.SourceClickHouse("analytics", "users");
    dict.Hashed();
    dict.Lifetime(60, 300);
});

builder.DropDictionary("user_names");

Raw SQL & Custom Operations

builder.Sql("OPTIMIZE TABLE events FINAL");
builder.Custom(new RawSqlOp("SYSTEM RELOAD DICTIONARY user_names"));

Safety Policies

By default, the migration runner blocks destructive operations. Each flag must be explicitly enabled:

Flag Default Controls
AllowDropTable false DROP TABLE operations
AllowDropColumn false DROP COLUMN operations
AllowEngineChange false Engine modification attempts
AllowTypeNarrowing false Type changes that may lose data (e.g., UInt64 to UInt32, String to FixedString)
AllowDropMaterializedView false DROP MATERIALIZED VIEW and structural view changes
AllowDropDictionary false DROP DICTIONARY operations

Overriding Safety Policies

var engine = new MigrationEngine(new MigrationEngineOptions
{
    ConnectionString = "Host=localhost;Port=9000",
    Database = "analytics",
    SafetyPolicy = SafetyPolicy.Default with
    {
        AllowDropColumn = true,
        AllowDropTable = true
    }
});

The policy validates all operations before any are executed. If any operation is blocked, the entire migration fails with a BlockedOperation list explaining why.

Custom Validation Rules

Add domain-specific rules via CustomRules:

SafetyPolicy = SafetyPolicy.Default with
{
    CustomRules = [myCustomRule]
}

Lock Table Mechanism

The migration runner uses a distributed TTL-based lock to prevent concurrent migrations across instances.

Lock table schema (__migration_lock):

CREATE TABLE IF NOT EXISTS {database}.__migration_lock
(
    owner      String,
    heartbeat  DateTime64(3) DEFAULT now64(3),
    released   UInt8 DEFAULT 0
)
ENGINE = MergeTree()
ORDER BY (owner)
TTL heartbeat + INTERVAL 2 MINUTES

Behavior:

  1. Insert a lock row with owner = {MachineName}:{ProcessId}:{GUID}
  2. Wait for merge (configurable PostInsertDelay, default 2s)
  3. Verify ownership by re-reading the lock table
  4. Start heartbeat (default every 30s) to prevent TTL cleanup
  5. On completion, set released = 1
  6. Stale locks auto-expire after the TTL interval (default 2 minutes)

Configuration via MigrationLockOptions:

Option Default Description
HeartbeatInterval 30s How often to update the heartbeat
StalenessThreshold 2 min When a lock is considered stale
MaxRetries 10 Retry attempts for lock acquisition
RetryDelay 5s Wait between retry attempts
PostInsertDelay 2s Wait after insert for merge

Checksum Validation

Each migration's operations are checksummed (SHA256) and stored in the history table. On every migration run, the framework compares recorded checksums against the current migration code.

If a checksum mismatches, the migration run is blocked entirely. This prevents silently running modified migrations that have already been applied.

The checksum is computed by calling migration.Up(builder), serializing all resulting operations, and hashing the output.

History table (_chsharp_migrations):

CREATE TABLE IF NOT EXISTS {database}._chsharp_migrations
(
    id          String,
    name        String,
    checksum    String,
    applied_at  DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 1

Lifecycle Hooks

Implement IMigrationHook to run custom logic at key points:

public interface IMigrationHook
{
    Task BeforeAllAsync(IReadOnlyList<MigrationBase> pending, CancellationToken ct);
    Task BeforeMigrationAsync(MigrationBase migration, CancellationToken ct);
    Task AfterMigrationAsync(MigrationBase migration, bool success, CancellationToken ct);
    Task AfterAllAsync(MigrationResult result, CancellationToken ct);
}

All methods have default implementations (Task.CompletedTask), so you only need to override what you care about.

Register hooks via MigrationEngineOptions:

var engine = new MigrationEngine(new MigrationEngineOptions
{
    // ...
    Hooks = [new LoggingHook(), new SlackNotificationHook()]
});

Execution order:

  1. Acquire lock
  2. Load and validate checksums
  3. BeforeAllAsync (all pending migrations)
  4. For each migration:
    • BeforeMigrationAsync
    • Execute DDL
    • Record in history
    • AfterMigrationAsync(success: true) or AfterMigrationAsync(success: false) on error
  5. AfterAllAsync (with MigrationResult)
  6. Release lock

Handling UnsupportedEngineChangeOp

Some schema changes cannot be applied via ALTER TABLE in ClickHouse:

  • Changing the engine (e.g., MergeTree to ReplacingMergeTree)
  • Modifying engine arguments
  • Changing PARTITION BY, PRIMARY KEY, or SAMPLE BY

The differ emits UnsupportedEngineChangeOp for these cases, and the runner throws NotSupportedException:

Cannot alter EngineName on table 'events' (current: 'MergeTree', desired: 'ReplacingMergeTree').
This change requires a manual table rebuild.

Manual resolution: Create a new table with the desired engine, INSERT INTO ... SELECT FROM the old table, drop the old table, and rename the new one.

Schema Operations

The differ produces these operation types:

Operation Description
CreateTableOp Create a new table
DropTableOp Drop a table
AddColumnOp Add a column (with optional After positioning)
DropColumnOp Drop a column
AlterColumnOp Change a column's type, default, codec, or TTL
RenameColumnOp Rename a column
AddIndexOp Add a skipping index
DropIndexOp Drop a skipping index
AddProjectionOp Add a projection
DropProjectionOp Drop a projection
ModifyOrderByOp Change ORDER BY
ModifyTtlOp Change table TTL
ModifySettingOp Change an engine setting
ResetSettingOp Reset an engine setting
ModifyTableCommentOp Change table comment
ModifyColumnCommentOp Change column comment
CreateMaterializedViewOp Create a materialized view
DropMaterializedViewOp Drop a materialized view
AlterMaterializedViewOp Alter a materialized view's SELECT
CreateDictionaryOp Create a dictionary
DropDictionaryOp Drop a dictionary
UnsupportedEngineChangeOp Engine change requiring manual rebuild
RawSqlOp Raw SQL statement

Custom Operation Handlers

For operations that need custom SQL compilation:

public interface ICustomOperationHandler
{
    bool CanHandle(SchemaOperation operation);
    IReadOnlyList<string> CompileSql(SchemaOperation operation, string database);
}

Register via MigrationEngineOptions.CustomOperationHandlers. When a handler matches, its SQL replaces the default compilation path.

Migration File Naming

Generated migrations follow the pattern: {Timestamp}_{Sequence}_{DescriptiveName}.cs

The migration ID is {Timestamp}_{Sequence} (e.g., 20240115120000_001). Migrations are applied in ID order.

MigrationEngine API

// Diff desired schema against live database
Task<IReadOnlyList<SchemaOperation>> DiffAsync(DatabaseSchema desiredSchema, CancellationToken ct)

// Generate C# migration files from operations
GeneratedMigrationSet GenerateMigration(string name, IReadOnlyList<SchemaOperation> operations)

// Apply pending migrations
Task<MigrationResult> MigrateAsync(IReadOnlyList<MigrationBase> migrations, CancellationToken ct)

// Check which migrations haven't been applied
Task<IReadOnlyList<MigrationInfo>> GetPendingAsync(IReadOnlyList<MigrationBase> migrations, CancellationToken ct)

MigrationResult

public sealed record MigrationResult(
    IReadOnlyList<string> Applied,
    IReadOnlyList<string> Skipped,
    bool Success,
    string? Error = null)
{
    public IReadOnlyList<StepResult> Steps { get; init; }
    public StepResult? FailedStep { get; init; }
}

public sealed record StepResult(
    string MigrationId,
    string MigrationName,
    StepStatus Status,       // Applied, Skipped, or Failed
    string? Error = null);