CH.Toolkit provides auto-diffing migrations with safety policies, distributed locking, checksum validation, and lifecycle hooks.
public class Event
{
public DateTime Timestamp { get; set; }
public string UserId { get; set; } = "";
public string EventType { get; set; } = "";
public int Value { get; set; }
}var schema = new SchemaBuilder()
.Table<Event>()
.MergeTree()
.OrderBy(e => e.Timestamp, e => e.UserId)
.PartitionByMonth(e => e.Timestamp)
.Column(e => e.EventType).LowCardinalityString().Table
.Build("analytics");var engine = new MigrationEngine(new MigrationEngineOptions
{
ConnectionString = "Host=localhost;Port=9000",
Database = "analytics"
});
var operations = await engine.DiffAsync(schema);
// Returns: [CreateTableOp(event), ...]var generated = engine.GenerateMigration("InitialCreate", operations);
// Writes C# migration classes to diskOr use the CLI:
chsharp add-migration --name InitialCreateawait engine.MigrateAsync(migrationsAssembly);Or use the CLI:
chsharp migrateAll migrations inherit from MigrationBase:
public abstract class MigrationBase
{
public abstract string Id { get; } // Unique ID (typically timestamp)
public abstract string Name { get; } // Human-readable name
public abstract void Up(MigrationBuilder builder); // Forward operations
public abstract void Down(MigrationBuilder builder); // Reverse operations (for documentation)
}The framework calls Up() during forward migration. Down() is not called automatically -- it exists for documentation and manual rollback scenarios.
builder.CreateTable("events", table =>
{
table.Column("timestamp", "DateTime");
table.Column("user_id", "String");
table.Column("value", "Int32", c => c.Default("0").SetCodec("ZSTD(3)"));
table.Engine("MergeTree");
table.OrderBy("timestamp", "user_id");
table.PartitionBy("toYYYYMM(timestamp)");
});
builder.DropTable("old_events");builder.AddColumn("events", "category", "LowCardinality(String)",
c => c.Default("'unknown'").SetComment("Event category").SetCodec("ZSTD"));
builder.DropColumn("events", "old_field");
builder.AlterColumn("events", "value", "Int64");
builder.RenameColumn("events", "user_id", "uid");builder.AddIndex("events", "idx_user", "user_id", "minmax", 4);
builder.DropIndex("events", "idx_user");builder.AddProjection("events", "proj_by_user",
"SELECT user_id, count() GROUP BY user_id");
builder.DropProjection("events", "proj_by_user");builder.CreateMaterializedView("hourly_events",
"SELECT toStartOfHour(timestamp) AS hour, count() AS cnt FROM events GROUP BY hour",
mv =>
{
mv.Engine("SummingMergeTree");
mv.OrderBy("hour");
});
builder.DropMaterializedView("hourly_events");builder.CreateDictionary("user_names", dict =>
{
dict.PrimaryKey("user_id");
dict.Attribute("name", "String");
dict.SourceClickHouse("analytics", "users");
dict.Hashed();
dict.Lifetime(60, 300);
});
builder.DropDictionary("user_names");builder.Sql("OPTIMIZE TABLE events FINAL");
builder.Custom(new RawSqlOp("SYSTEM RELOAD DICTIONARY user_names"));By default, the migration runner blocks destructive operations. Each flag must be explicitly enabled:
| Flag | Default | Controls |
|---|---|---|
AllowDropTable |
false |
DROP TABLE operations |
AllowDropColumn |
false |
DROP COLUMN operations |
AllowEngineChange |
false |
Engine modification attempts |
AllowTypeNarrowing |
false |
Type changes that may lose data (e.g., UInt64 to UInt32, String to FixedString) |
AllowDropMaterializedView |
false |
DROP MATERIALIZED VIEW and structural view changes |
AllowDropDictionary |
false |
DROP DICTIONARY operations |
var engine = new MigrationEngine(new MigrationEngineOptions
{
ConnectionString = "Host=localhost;Port=9000",
Database = "analytics",
SafetyPolicy = SafetyPolicy.Default with
{
AllowDropColumn = true,
AllowDropTable = true
}
});The policy validates all operations before any are executed. If any operation is blocked, the entire migration fails with a BlockedOperation list explaining why.
Add domain-specific rules via CustomRules:
SafetyPolicy = SafetyPolicy.Default with
{
CustomRules = [myCustomRule]
}The migration runner uses a distributed TTL-based lock to prevent concurrent migrations across instances.
Lock table schema (__migration_lock):
CREATE TABLE IF NOT EXISTS {database}.__migration_lock
(
owner String,
heartbeat DateTime64(3) DEFAULT now64(3),
released UInt8 DEFAULT 0
)
ENGINE = MergeTree()
ORDER BY (owner)
TTL heartbeat + INTERVAL 2 MINUTESBehavior:
- Insert a lock row with owner =
{MachineName}:{ProcessId}:{GUID} - Wait for merge (configurable
PostInsertDelay, default 2s) - Verify ownership by re-reading the lock table
- Start heartbeat (default every 30s) to prevent TTL cleanup
- On completion, set
released = 1 - Stale locks auto-expire after the TTL interval (default 2 minutes)
Configuration via MigrationLockOptions:
| Option | Default | Description |
|---|---|---|
HeartbeatInterval |
30s | How often to update the heartbeat |
StalenessThreshold |
2 min | When a lock is considered stale |
MaxRetries |
10 | Retry attempts for lock acquisition |
RetryDelay |
5s | Wait between retry attempts |
PostInsertDelay |
2s | Wait after insert for merge |
Each migration's operations are checksummed (SHA256) and stored in the history table. On every migration run, the framework compares recorded checksums against the current migration code.
If a checksum mismatches, the migration run is blocked entirely. This prevents silently running modified migrations that have already been applied.
The checksum is computed by calling migration.Up(builder), serializing all resulting operations, and hashing the output.
History table (_chsharp_migrations):
CREATE TABLE IF NOT EXISTS {database}._chsharp_migrations
(
id String,
name String,
checksum String,
applied_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (id)
SETTINGS index_granularity = 1Implement IMigrationHook to run custom logic at key points:
public interface IMigrationHook
{
Task BeforeAllAsync(IReadOnlyList<MigrationBase> pending, CancellationToken ct);
Task BeforeMigrationAsync(MigrationBase migration, CancellationToken ct);
Task AfterMigrationAsync(MigrationBase migration, bool success, CancellationToken ct);
Task AfterAllAsync(MigrationResult result, CancellationToken ct);
}All methods have default implementations (Task.CompletedTask), so you only need to override what you care about.
Register hooks via MigrationEngineOptions:
var engine = new MigrationEngine(new MigrationEngineOptions
{
// ...
Hooks = [new LoggingHook(), new SlackNotificationHook()]
});Execution order:
- Acquire lock
- Load and validate checksums
BeforeAllAsync(all pending migrations)- For each migration:
BeforeMigrationAsync- Execute DDL
- Record in history
AfterMigrationAsync(success: true)orAfterMigrationAsync(success: false)on error
AfterAllAsync(withMigrationResult)- Release lock
Some schema changes cannot be applied via ALTER TABLE in ClickHouse:
- Changing the engine (e.g.,
MergeTreetoReplacingMergeTree) - Modifying engine arguments
- Changing
PARTITION BY,PRIMARY KEY, orSAMPLE BY
The differ emits UnsupportedEngineChangeOp for these cases, and the runner throws NotSupportedException:
Cannot alter EngineName on table 'events' (current: 'MergeTree', desired: 'ReplacingMergeTree').
This change requires a manual table rebuild.
Manual resolution: Create a new table with the desired engine, INSERT INTO ... SELECT FROM the old table, drop the old table, and rename the new one.
The differ produces these operation types:
| Operation | Description |
|---|---|
CreateTableOp |
Create a new table |
DropTableOp |
Drop a table |
AddColumnOp |
Add a column (with optional After positioning) |
DropColumnOp |
Drop a column |
AlterColumnOp |
Change a column's type, default, codec, or TTL |
RenameColumnOp |
Rename a column |
AddIndexOp |
Add a skipping index |
DropIndexOp |
Drop a skipping index |
AddProjectionOp |
Add a projection |
DropProjectionOp |
Drop a projection |
ModifyOrderByOp |
Change ORDER BY |
ModifyTtlOp |
Change table TTL |
ModifySettingOp |
Change an engine setting |
ResetSettingOp |
Reset an engine setting |
ModifyTableCommentOp |
Change table comment |
ModifyColumnCommentOp |
Change column comment |
CreateMaterializedViewOp |
Create a materialized view |
DropMaterializedViewOp |
Drop a materialized view |
AlterMaterializedViewOp |
Alter a materialized view's SELECT |
CreateDictionaryOp |
Create a dictionary |
DropDictionaryOp |
Drop a dictionary |
UnsupportedEngineChangeOp |
Engine change requiring manual rebuild |
RawSqlOp |
Raw SQL statement |
For operations that need custom SQL compilation:
public interface ICustomOperationHandler
{
bool CanHandle(SchemaOperation operation);
IReadOnlyList<string> CompileSql(SchemaOperation operation, string database);
}Register via MigrationEngineOptions.CustomOperationHandlers. When a handler matches, its SQL replaces the default compilation path.
Generated migrations follow the pattern: {Timestamp}_{Sequence}_{DescriptiveName}.cs
The migration ID is {Timestamp}_{Sequence} (e.g., 20240115120000_001). Migrations are applied in ID order.
// Diff desired schema against live database
Task<IReadOnlyList<SchemaOperation>> DiffAsync(DatabaseSchema desiredSchema, CancellationToken ct)
// Generate C# migration files from operations
GeneratedMigrationSet GenerateMigration(string name, IReadOnlyList<SchemaOperation> operations)
// Apply pending migrations
Task<MigrationResult> MigrateAsync(IReadOnlyList<MigrationBase> migrations, CancellationToken ct)
// Check which migrations haven't been applied
Task<IReadOnlyList<MigrationInfo>> GetPendingAsync(IReadOnlyList<MigrationBase> migrations, CancellationToken ct)public sealed record MigrationResult(
IReadOnlyList<string> Applied,
IReadOnlyList<string> Skipped,
bool Success,
string? Error = null)
{
public IReadOnlyList<StepResult> Steps { get; init; }
public StepResult? FailedStep { get; init; }
}
public sealed record StepResult(
string MigrationId,
string MigrationName,
StepStatus Status, // Applied, Skipped, or Failed
string? Error = null);