Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions app/common/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,54 @@ import (

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/pkg/framework/clickhouseotel"
"github.com/openmeterio/openmeter/tools/migrate"
)

var ClickHouse = wire.NewSet(
NewClickHouse,
wire.Struct(new(ClickHouseMigrator), "*"),
)

// ClickHouseMigrator executes ClickHouse schema migrations.
type ClickHouseMigrator struct {
Config config.ClickHouseAggregationConfiguration
Logger *slog.Logger
}

func (m ClickHouseMigrator) Migrate(ctx context.Context) error {
if !m.Config.AutoMigrate.Enabled() {
m.Logger.Debug("clickhouse auto migration is disabled")
return nil
}

m.Logger.Debug("running clickhouse migrations", slog.String("strategy", string(m.Config.AutoMigrate)))

migrator, err := migrate.New(migrate.MigrateOptions{
ConnectionString: m.Config.AsMigrateURL(),
Migrations: migrate.ClickHouseMigrationsConfig,
Logger: m.Logger,
})
if err != nil {
return fmt.Errorf("failed to create clickhouse migrator: %w", err)
}
defer migrator.CloseOrLogError()

switch m.Config.AutoMigrate {
case config.ClickHouseAutoMigrateMigration:
if err := migrator.Up(); err != nil {
return fmt.Errorf("failed to migrate clickhouse: %w", err)
}
case config.ClickHouseAutoMigrateMigrationJob:
if err := migrator.WaitForMigrationJob(); err != nil {
return fmt.Errorf("failed to wait for clickhouse migration job: %w", err)
}
}

m.Logger.Info("clickhouse database initialized")

return nil
}

func NewClickHouse(ctx context.Context, conf config.ClickHouseAggregationConfiguration, tracer trace.Tracer, meter metric.Meter, logger *slog.Logger) (clickhouse.Conn, func(), error) {
noopClose := func() {}

Expand Down
57 changes: 57 additions & 0 deletions app/config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"net/url"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -79,6 +80,9 @@ type ClickHouseAggregationConfiguration struct {
PoolMetrics ClickhousePoolMetricsConfig

Retry ClickhouseQueryRetryConfig

// AutoMigrate controls whether ClickHouse schema migrations run automatically on startup.
AutoMigrate ClickHouseAutoMigrate `yaml:"autoMigrate"`
}

// Validate validates the configuration.
Expand Down Expand Up @@ -117,6 +121,10 @@ func (c ClickHouseAggregationConfiguration) Validate() error {
errs = append(errs, fmt.Errorf("pool metrics: %w", err))
}

if err := c.AutoMigrate.Validate(); err != nil {
errs = append(errs, err)
}

return errors.Join(errs...)
}

Expand Down Expand Up @@ -189,6 +197,52 @@ func (c ClickhousePoolMetricsConfig) Validate() error {
return errors.Join(errs...)
}

// ClickHouseAutoMigrate controls ClickHouse schema migration strategy.
type ClickHouseAutoMigrate string

const (
ClickHouseAutoMigrateMigration ClickHouseAutoMigrate = "migration"
ClickHouseAutoMigrateMigrationJob ClickHouseAutoMigrate = "migration-job"
ClickHouseAutoMigrateOff ClickHouseAutoMigrate = "false"
)

func (a ClickHouseAutoMigrate) Enabled() bool {
return a != "false"
}

func (a ClickHouseAutoMigrate) Validate() error {
switch a {
case ClickHouseAutoMigrateMigration, ClickHouseAutoMigrateMigrationJob, ClickHouseAutoMigrateOff:
return nil
default:
return fmt.Errorf("invalid clickhouse auto-migrate value: %q", a)
}
}

// AsMigrateURL builds a clickhouse:// URL for use with golang-migrate.
func (c ClickHouseAggregationConfiguration) AsMigrateURL() string {
scheme := "clickhouse"

params := url.Values{}
params.Set("username", c.Username)
params.Set("password", c.Password)
params.Set("database", c.Database)
params.Set("x-multi-statement", "true")
params.Set("x-migrations-table-engine", "MergeTree")

if c.TLS {
params.Set("secure", "true")
}

u := url.URL{
Scheme: scheme,
Host: c.Address,
RawQuery: params.Encode(),
}

return u.String()
}

// ConfigureAggregation configures some defaults in the Viper instance.
func ConfigureAggregation(v *viper.Viper) {
v.SetDefault("aggregation.eventsTableName", "om_events")
Expand All @@ -208,6 +262,9 @@ func ConfigureAggregation(v *viper.Viper) {
v.SetDefault("aggregation.clickhouse.connMaxLifetime", "10m")
v.SetDefault("aggregation.clickhouse.blockBufferSize", 10)

// Auto-migration
v.SetDefault("aggregation.clickhouse.autoMigrate", "migration")

// Retry
v.SetDefault("aggregation.clickhouse.retry.enabled", false)
v.SetDefault("aggregation.clickhouse.retry.maxTries", 3)
Expand Down
8 changes: 7 additions & 1 deletion cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,16 @@ func main() {

// Validate service prerequisites

if err := app.Migrate(ctx); err != nil {
if err := app.Migrator.Migrate(ctx); err != nil {
logger.Error("failed to initialize database", "error", err)
os.Exit(1)
}

// Migrate ClickHouse
if err := app.ClickHouseMigrator.Migrate(ctx); err != nil {
logger.Error("failed to initialize clickhouse", "error", err)
os.Exit(1)
}

app.Run()
}
1 change: 1 addition & 0 deletions cmd/balance-worker/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type Application struct {
common.GlobalInitializer
common.Migrator
common.ClickHouseMigrator
common.Runner

Logger *slog.Logger
Expand Down
18 changes: 12 additions & 6 deletions cmd/balance-worker/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion cmd/billing-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ func main() {
logger := app.Logger

// Migrate database
if err := app.Migrate(ctx); err != nil {
if err := app.Migrator.Migrate(ctx); err != nil {
logger.Error("failed to initialize database", "error", err)
os.Exit(1)
}

// Migrate ClickHouse
if err := app.ClickHouseMigrator.Migrate(ctx); err != nil {
logger.Error("failed to initialize clickhouse", "error", err)
os.Exit(1)
}

// Provision sandbox app
err = app.AppRegistry.SandboxProvisioner(ctx, app.NamespaceManager.GetDefaultNamespace())
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/billing-worker/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type Application struct {
common.GlobalInitializer
common.Migrator
common.ClickHouseMigrator
common.Runner

AppRegistry common.AppRegistry
Expand Down
26 changes: 16 additions & 10 deletions cmd/billing-worker/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/jobs/internal/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
type Application struct {
common.GlobalInitializer
common.Migrator
common.ClickHouseMigrator

App app.Service
AppStripe appstripe.Service
Expand Down
10 changes: 8 additions & 2 deletions cmd/jobs/internal/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading