Skip to content

Commit 2118e6d

Browse files
committed
feat(clickhouse): use schema migrations
1 parent 19d7c40 commit 2118e6d

26 files changed

Lines changed: 443 additions & 133 deletions

File tree

app/common/clickhouse.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,54 @@ import (
1212

1313
"github.com/openmeterio/openmeter/app/config"
1414
"github.com/openmeterio/openmeter/pkg/framework/clickhouseotel"
15+
"github.com/openmeterio/openmeter/tools/migrate"
1516
)
1617

1718
var ClickHouse = wire.NewSet(
1819
NewClickHouse,
20+
wire.Struct(new(ClickHouseMigrator), "*"),
1921
)
2022

23+
// ClickHouseMigrator executes ClickHouse schema migrations.
24+
type ClickHouseMigrator struct {
25+
Config config.ClickHouseAggregationConfiguration
26+
Logger *slog.Logger
27+
}
28+
29+
func (m ClickHouseMigrator) Migrate(ctx context.Context) error {
30+
if !m.Config.AutoMigrate.Enabled() {
31+
m.Logger.Debug("clickhouse auto migration is disabled")
32+
return nil
33+
}
34+
35+
m.Logger.Debug("running clickhouse migrations", slog.String("strategy", string(m.Config.AutoMigrate)))
36+
37+
migrator, err := migrate.New(migrate.MigrateOptions{
38+
ConnectionString: m.Config.AsMigrateURL(),
39+
Migrations: migrate.ClickHouseMigrationsConfig,
40+
Logger: m.Logger,
41+
})
42+
if err != nil {
43+
return fmt.Errorf("failed to create clickhouse migrator: %w", err)
44+
}
45+
defer migrator.CloseOrLogError()
46+
47+
switch m.Config.AutoMigrate {
48+
case config.ClickHouseAutoMigrateMigration:
49+
if err := migrator.Up(); err != nil {
50+
return fmt.Errorf("failed to migrate clickhouse: %w", err)
51+
}
52+
case config.ClickHouseAutoMigrateMigrationJob:
53+
if err := migrator.WaitForMigrationJob(); err != nil {
54+
return fmt.Errorf("failed to wait for clickhouse migration job: %w", err)
55+
}
56+
}
57+
58+
m.Logger.Info("clickhouse database initialized")
59+
60+
return nil
61+
}
62+
2163
func NewClickHouse(ctx context.Context, conf config.ClickHouseAggregationConfiguration, tracer trace.Tracer, meter metric.Meter, logger *slog.Logger) (clickhouse.Conn, func(), error) {
2264
noopClose := func() {}
2365

app/config/aggregation.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"crypto/tls"
55
"errors"
66
"fmt"
7+
"net/url"
78
"time"
89

910
"github.com/ClickHouse/clickhouse-go/v2"
@@ -79,6 +80,9 @@ type ClickHouseAggregationConfiguration struct {
7980
PoolMetrics ClickhousePoolMetricsConfig
8081

8182
Retry ClickhouseQueryRetryConfig
83+
84+
// AutoMigrate controls whether ClickHouse schema migrations run automatically on startup.
85+
AutoMigrate ClickHouseAutoMigrate `yaml:"autoMigrate"`
8286
}
8387

8488
// Validate validates the configuration.
@@ -117,6 +121,10 @@ func (c ClickHouseAggregationConfiguration) Validate() error {
117121
errs = append(errs, fmt.Errorf("pool metrics: %w", err))
118122
}
119123

124+
if err := c.AutoMigrate.Validate(); err != nil {
125+
errs = append(errs, err)
126+
}
127+
120128
return errors.Join(errs...)
121129
}
122130

@@ -189,6 +197,52 @@ func (c ClickhousePoolMetricsConfig) Validate() error {
189197
return errors.Join(errs...)
190198
}
191199

200+
// ClickHouseAutoMigrate controls ClickHouse schema migration strategy.
201+
type ClickHouseAutoMigrate string
202+
203+
const (
204+
ClickHouseAutoMigrateMigration ClickHouseAutoMigrate = "migration"
205+
ClickHouseAutoMigrateMigrationJob ClickHouseAutoMigrate = "migration-job"
206+
ClickHouseAutoMigrateOff ClickHouseAutoMigrate = "false"
207+
)
208+
209+
func (a ClickHouseAutoMigrate) Enabled() bool {
210+
return a != "false"
211+
}
212+
213+
func (a ClickHouseAutoMigrate) Validate() error {
214+
switch a {
215+
case ClickHouseAutoMigrateMigration, ClickHouseAutoMigrateMigrationJob, ClickHouseAutoMigrateOff:
216+
return nil
217+
default:
218+
return fmt.Errorf("invalid clickhouse auto-migrate value: %q", a)
219+
}
220+
}
221+
222+
// AsMigrateURL builds a clickhouse:// URL for use with golang-migrate.
223+
func (c ClickHouseAggregationConfiguration) AsMigrateURL() string {
224+
scheme := "clickhouse"
225+
226+
params := url.Values{}
227+
params.Set("username", c.Username)
228+
params.Set("password", c.Password)
229+
params.Set("database", c.Database)
230+
params.Set("x-multi-statement", "true")
231+
params.Set("x-migrations-table-engine", "MergeTree")
232+
233+
if c.TLS {
234+
params.Set("secure", "true")
235+
}
236+
237+
u := url.URL{
238+
Scheme: scheme,
239+
Host: c.Address,
240+
RawQuery: params.Encode(),
241+
}
242+
243+
return u.String()
244+
}
245+
192246
// ConfigureAggregation configures some defaults in the Viper instance.
193247
func ConfigureAggregation(v *viper.Viper) {
194248
v.SetDefault("aggregation.eventsTableName", "om_events")
@@ -208,6 +262,9 @@ func ConfigureAggregation(v *viper.Viper) {
208262
v.SetDefault("aggregation.clickhouse.connMaxLifetime", "10m")
209263
v.SetDefault("aggregation.clickhouse.blockBufferSize", 10)
210264

265+
// Auto-migration
266+
v.SetDefault("aggregation.clickhouse.autoMigrate", "migration")
267+
211268
// Retry
212269
v.SetDefault("aggregation.clickhouse.retry.enabled", false)
213270
v.SetDefault("aggregation.clickhouse.retry.maxTries", 3)

cmd/balance-worker/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,16 @@ func main() {
7979

8080
// Validate service prerequisites
8181

82-
if err := app.Migrate(ctx); err != nil {
82+
if err := app.Migrator.Migrate(ctx); err != nil {
8383
logger.Error("failed to initialize database", "error", err)
8484
os.Exit(1)
8585
}
8686

87+
// Migrate ClickHouse
88+
if err := app.ClickHouseMigrator.Migrate(ctx); err != nil {
89+
logger.Error("failed to initialize clickhouse", "error", err)
90+
os.Exit(1)
91+
}
92+
8793
app.Run()
8894
}

cmd/balance-worker/wire.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
type Application struct {
1717
common.GlobalInitializer
1818
common.Migrator
19+
common.ClickHouseMigrator
1920
common.Runner
2021

2122
Logger *slog.Logger

cmd/balance-worker/wire_gen.go

Lines changed: 12 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/billing-worker/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,17 @@ func main() {
7878
logger := app.Logger
7979

8080
// Migrate database
81-
if err := app.Migrate(ctx); err != nil {
81+
if err := app.Migrator.Migrate(ctx); err != nil {
8282
logger.Error("failed to initialize database", "error", err)
8383
os.Exit(1)
8484
}
8585

86+
// Migrate ClickHouse
87+
if err := app.ClickHouseMigrator.Migrate(ctx); err != nil {
88+
logger.Error("failed to initialize clickhouse", "error", err)
89+
os.Exit(1)
90+
}
91+
8692
// Provision sandbox app
8793
err = app.AppRegistry.SandboxProvisioner(ctx, app.NamespaceManager.GetDefaultNamespace())
8894
if err != nil {

cmd/billing-worker/wire.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
type Application struct {
2020
common.GlobalInitializer
2121
common.Migrator
22+
common.ClickHouseMigrator
2223
common.Runner
2324

2425
AppRegistry common.AppRegistry

cmd/billing-worker/wire_gen.go

Lines changed: 16 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/jobs/internal/wire.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
type Application struct {
3939
common.GlobalInitializer
4040
common.Migrator
41+
common.ClickHouseMigrator
4142

4243
App app.Service
4344
AppStripe appstripe.Service

cmd/jobs/internal/wire_gen.go

Lines changed: 8 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)