Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cli-definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
"description": "Name of the postgres replication slot to be created by pgstream on the source url",
"default": ""
},
{
"name": "upgrade",
"description": "Clean up v0.9.x state before initializing (idempotent, safe for repeated use)",
"default": "false"
},
{
"name": "with-injector",
"description": "Whether to initialise pgstream with the injector database migrations",
Expand Down Expand Up @@ -117,6 +122,11 @@
"description": "Target URL",
"default": ""
},
{
"name": "upgrade",
"description": "Clean up v0.9.x state before initializing (idempotent, safe for repeated use; implies --init)",
"default": "false"
},
{
"name": "with-injector",
"description": "Whether to enable the injection of pgstream metadata to the WAL events. Required for search targets.",
Expand Down
10 changes: 9 additions & 1 deletion cmd/init_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ var tearDownCmd = &cobra.Command{
}

func initDestroyFlagBinding(cmd *cobra.Command, _ []string) {
viper.BindPFlag("migrations-only", cmd.Flags().Lookup("migrations-only"))
if f := cmd.Flags().Lookup("migrations-only"); f != nil {
viper.BindPFlag("migrations-only", f)
}
if f := cmd.Flags().Lookup("upgrade"); f != nil {
viper.BindPFlag("upgrade", f)
}
// to be able to overwrite configuration with flags when yaml config file is
// provided
viper.BindPFlag("source.postgres.url", cmd.Flags().Lookup("postgres-url"))
Expand All @@ -123,5 +128,8 @@ func getInitOptions() []stream.InitOption {
if viper.GetBool("migrations-only") {
initOpts = append(initOpts, stream.WithMigrationsOnly())
}
if viper.GetBool("upgrade") {
initOpts = append(initOpts, stream.WithUpgrade())
}
return initOpts
}
2 changes: 2 additions & 0 deletions cmd/root_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func Prepare() *cobra.Command {
initCmd.Flags().String("replication-slot", "", "Name of the postgres replication slot to be created by pgstream on the source url")
initCmd.Flags().Bool("with-injector", false, "Whether to initialise pgstream with the injector database migrations")
initCmd.Flags().Bool("migrations-only", false, "Whether to only run the initialization database migrations")
initCmd.Flags().Bool("upgrade", false, "Clean up v0.9.x state before initializing (idempotent, safe for repeated use)")

// destroy cmd
destroyCmd.Flags().String("postgres-url", "", "Source postgres URL where pgstream destroy will be run")
Expand Down Expand Up @@ -85,6 +86,7 @@ func Prepare() *cobra.Command {
runCmd.Flags().Bool("reset", false, "Whether to reset the target before snapshotting (only for postgres target)")
runCmd.Flags().Bool("profile", false, "Whether to expose a /debug/pprof endpoint on localhost:6060")
runCmd.Flags().BoolVar(&initFlag, "init", false, "Whether to initialize pgstream before starting replication")
runCmd.Flags().BoolVar(&upgradeFlag, "upgrade", false, "Clean up v0.9.x state before initializing (idempotent, safe for repeated use; implies --init)")
runCmd.Flags().String("dump-file", "", "File where the pg_dump output will be written if initial snapshot is enabled")
runCmd.Flags().Bool("with-injector", false, "Whether to enable the injection of pgstream metadata to the WAL events. Required for search targets.")

Expand Down
13 changes: 12 additions & 1 deletion cmd/run_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var runCmd = &cobra.Command{

var (
initFlag = false
upgradeFlag = false
errUnsupportedSource = errors.New("unsupported source")
errUnsupportedTarget = errors.New("unsupported target")
)
Expand Down Expand Up @@ -60,7 +61,17 @@ func run(ctx context.Context) error {
}
defer provider.Close()

return stream.Run(ctx, zerolog.NewStdLogger(logger), streamConfig, initFlag, provider.NewInstrumentation("run"))
// --upgrade implies --init
if upgradeFlag {
initFlag = true
}

var opts []stream.InitOption
if upgradeFlag {
opts = append(opts, stream.WithUpgrade())
}

return stream.Run(ctx, zerolog.NewStdLogger(logger), streamConfig, initFlag, provider.NewInstrumentation("run"), opts...)
}

func runFlagBinding(cmd *cobra.Command, args []string) error {
Expand Down
74 changes: 74 additions & 0 deletions pkg/stream/stream_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type InitConfig struct {
ReplicationSlotName string
InjectorMigrationsEnabled bool
MigrationsOnly bool
Upgrade bool
}

type InitOption func(*InitConfig)
Expand All @@ -31,6 +32,12 @@ func WithMigrationsOnly() InitOption {
}
}

func WithUpgrade() InitOption {
return func(cfg *InitConfig) {
cfg.Upgrade = true
}
}

const (
pgstreamSchema = "pgstream"
)
Expand All @@ -56,6 +63,12 @@ func Init(ctx context.Context, config *InitConfig) error {
return fmt.Errorf("failed to create pgstream schema: %w", err)
}

if config.Upgrade {
if err := cleanupV09xState(ctx, conn); err != nil {
return fmt.Errorf("failed to clean up v0.9.x state: %w", err)
}
}

migrationAssets := []*migratorlib.MigrationAssets{
migratorlib.GetCoreMigrationAssets(),
}
Expand Down Expand Up @@ -244,3 +257,64 @@ func getReplicationSlotName(pgURL string) (string, error) {
}
return pglib.DefaultReplicationSlotName(dbName), nil
}

// cleanupV09xState removes database objects that were created by v0.9.x but
// are no longer needed in v1.0. The cleanup is idempotent — all statements use
// IF EXISTS so they are safe to run concurrently or repeatedly.
func cleanupV09xState(ctx context.Context, conn *pgx.Conn) error {
// Check if v0.9.x state exists by looking for the old schema_migrations
// table (v1.0 uses schema_migrations_core/schema_migrations_injector instead).
var exists bool
err := conn.QueryRow(ctx,
`SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'pgstream' AND table_name = 'schema_migrations'
)`).Scan(&exists)
if err != nil {
return fmt.Errorf("checking for v0.9.x state: %w", err)
}
if !exists {
return nil
}

// Drop all pgstream event triggers first (they depend on pgstream.log_schema)
rows, err := conn.Query(ctx, `SELECT evtname FROM pg_event_trigger WHERE evtname LIKE 'pgstream_%'`)
if err != nil {
return fmt.Errorf("querying event triggers: %w", err)
}
var triggers []string
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
rows.Close()
return fmt.Errorf("scanning event trigger name: %w", err)
}
triggers = append(triggers, name)
}
rows.Close()
if err := rows.Err(); err != nil {
return fmt.Errorf("iterating event triggers: %w", err)
}
for _, name := range triggers {
if _, err := conn.Exec(ctx, fmt.Sprintf("DROP EVENT TRIGGER IF EXISTS %s", name)); err != nil {
return fmt.Errorf("dropping event trigger %s: %w", name, err)
}
}

// v0.9.x objects that are not present in v1.0
cleanupStatements := []string{
"DROP FUNCTION IF EXISTS pgstream.log_schema()",
"DROP FUNCTION IF EXISTS pgstream.get_schema(text)",
"DROP FUNCTION IF EXISTS pgstream.refresh_schema()",
"DROP TABLE IF EXISTS pgstream.schema_log",
"DROP TABLE IF EXISTS pgstream.schema_migrations",
}

for _, stmt := range cleanupStatements {
if _, err := conn.Exec(ctx, stmt); err != nil {
return fmt.Errorf("executing %q: %w", stmt, err)
}
}

return nil
}
Loading