diff --git a/cli-definition.json b/cli-definition.json index e37567e5..a1eab73a 100644 --- a/cli-definition.json +++ b/cli-definition.json @@ -52,6 +52,11 @@ "description": "Name of the postgres replication slot to be created by pgstream on the source url", "default": "" }, + { + "name": "upgrade", + "description": "Clean up v0.9.x state before initializing (idempotent, safe for repeated use)", + "default": "false" + }, { "name": "with-injector", "description": "Whether to initialise pgstream with the injector database migrations", @@ -117,6 +122,11 @@ "description": "Target URL", "default": "" }, + { + "name": "upgrade", + "description": "Clean up v0.9.x state before initializing (idempotent, safe for repeated use; implies --init)", + "default": "false" + }, { "name": "with-injector", "description": "Whether to enable the injection of pgstream metadata to the WAL events. Required for search targets.", diff --git a/cmd/init_cmd.go b/cmd/init_cmd.go index 51c7f8bb..e37f2402 100644 --- a/cmd/init_cmd.go +++ b/cmd/init_cmd.go @@ -100,7 +100,12 @@ var tearDownCmd = &cobra.Command{ } func initDestroyFlagBinding(cmd *cobra.Command, _ []string) { - viper.BindPFlag("migrations-only", cmd.Flags().Lookup("migrations-only")) + if f := cmd.Flags().Lookup("migrations-only"); f != nil { + viper.BindPFlag("migrations-only", f) + } + if f := cmd.Flags().Lookup("upgrade"); f != nil { + viper.BindPFlag("upgrade", f) + } // to be able to overwrite configuration with flags when yaml config file is // provided viper.BindPFlag("source.postgres.url", cmd.Flags().Lookup("postgres-url")) @@ -123,5 +128,8 @@ func getInitOptions() []stream.InitOption { if viper.GetBool("migrations-only") { initOpts = append(initOpts, stream.WithMigrationsOnly()) } + if viper.GetBool("upgrade") { + initOpts = append(initOpts, stream.WithUpgrade()) + } return initOpts } diff --git a/cmd/root_cmd.go b/cmd/root_cmd.go index 7d800adb..1aff6781 100644 --- a/cmd/root_cmd.go +++ b/cmd/root_cmd.go @@ -53,6 +53,7 @@ func Prepare() *cobra.Command { initCmd.Flags().String("replication-slot", "", "Name of the postgres replication slot to be created by pgstream on the source url") initCmd.Flags().Bool("with-injector", false, "Whether to initialise pgstream with the injector database migrations") initCmd.Flags().Bool("migrations-only", false, "Whether to only run the initialization database migrations") + initCmd.Flags().Bool("upgrade", false, "Clean up v0.9.x state before initializing (idempotent, safe for repeated use)") // destroy cmd destroyCmd.Flags().String("postgres-url", "", "Source postgres URL where pgstream destroy will be run") @@ -85,6 +86,7 @@ func Prepare() *cobra.Command { runCmd.Flags().Bool("reset", false, "Whether to reset the target before snapshotting (only for postgres target)") runCmd.Flags().Bool("profile", false, "Whether to expose a /debug/pprof endpoint on localhost:6060") runCmd.Flags().BoolVar(&initFlag, "init", false, "Whether to initialize pgstream before starting replication") + runCmd.Flags().BoolVar(&upgradeFlag, "upgrade", false, "Clean up v0.9.x state before initializing (idempotent, safe for repeated use; implies --init)") runCmd.Flags().String("dump-file", "", "File where the pg_dump output will be written if initial snapshot is enabled") runCmd.Flags().Bool("with-injector", false, "Whether to enable the injection of pgstream metadata to the WAL events. Required for search targets.") diff --git a/cmd/run_cmd.go b/cmd/run_cmd.go index 5c25a592..298d8980 100644 --- a/cmd/run_cmd.go +++ b/cmd/run_cmd.go @@ -30,6 +30,7 @@ var runCmd = &cobra.Command{ var ( initFlag = false + upgradeFlag = false errUnsupportedSource = errors.New("unsupported source") errUnsupportedTarget = errors.New("unsupported target") ) @@ -60,7 +61,17 @@ func run(ctx context.Context) error { } defer provider.Close() - return stream.Run(ctx, zerolog.NewStdLogger(logger), streamConfig, initFlag, provider.NewInstrumentation("run")) + // --upgrade implies --init + if upgradeFlag { + initFlag = true + } + + var opts []stream.InitOption + if upgradeFlag { + opts = append(opts, stream.WithUpgrade()) + } + + return stream.Run(ctx, zerolog.NewStdLogger(logger), streamConfig, initFlag, provider.NewInstrumentation("run"), opts...) } func runFlagBinding(cmd *cobra.Command, args []string) error { diff --git a/pkg/stream/stream_init.go b/pkg/stream/stream_init.go index a5e75671..2e0abcae 100644 --- a/pkg/stream/stream_init.go +++ b/pkg/stream/stream_init.go @@ -21,6 +21,7 @@ type InitConfig struct { ReplicationSlotName string InjectorMigrationsEnabled bool MigrationsOnly bool + Upgrade bool } type InitOption func(*InitConfig) @@ -31,6 +32,12 @@ func WithMigrationsOnly() InitOption { } } +func WithUpgrade() InitOption { + return func(cfg *InitConfig) { + cfg.Upgrade = true + } +} + const ( pgstreamSchema = "pgstream" ) @@ -56,6 +63,12 @@ func Init(ctx context.Context, config *InitConfig) error { return fmt.Errorf("failed to create pgstream schema: %w", err) } + if config.Upgrade { + if err := cleanupV09xState(ctx, conn); err != nil { + return fmt.Errorf("failed to clean up v0.9.x state: %w", err) + } + } + migrationAssets := []*migratorlib.MigrationAssets{ migratorlib.GetCoreMigrationAssets(), } @@ -244,3 +257,64 @@ func getReplicationSlotName(pgURL string) (string, error) { } return pglib.DefaultReplicationSlotName(dbName), nil } + +// cleanupV09xState removes database objects that were created by v0.9.x but +// are no longer needed in v1.0. The cleanup is idempotent — all statements use +// IF EXISTS so they are safe to run concurrently or repeatedly. +func cleanupV09xState(ctx context.Context, conn *pgx.Conn) error { + // Check if v0.9.x state exists by looking for the old schema_migrations + // table (v1.0 uses schema_migrations_core/schema_migrations_injector instead). + var exists bool + err := conn.QueryRow(ctx, + `SELECT EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_schema = 'pgstream' AND table_name = 'schema_migrations' + )`).Scan(&exists) + if err != nil { + return fmt.Errorf("checking for v0.9.x state: %w", err) + } + if !exists { + return nil + } + + // Drop all pgstream event triggers first (they depend on pgstream.log_schema) + rows, err := conn.Query(ctx, `SELECT evtname FROM pg_event_trigger WHERE evtname LIKE 'pgstream_%'`) + if err != nil { + return fmt.Errorf("querying event triggers: %w", err) + } + var triggers []string + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + rows.Close() + return fmt.Errorf("scanning event trigger name: %w", err) + } + triggers = append(triggers, name) + } + rows.Close() + if err := rows.Err(); err != nil { + return fmt.Errorf("iterating event triggers: %w", err) + } + for _, name := range triggers { + if _, err := conn.Exec(ctx, fmt.Sprintf("DROP EVENT TRIGGER IF EXISTS %s", name)); err != nil { + return fmt.Errorf("dropping event trigger %s: %w", name, err) + } + } + + // v0.9.x objects that are not present in v1.0 + cleanupStatements := []string{ + "DROP FUNCTION IF EXISTS pgstream.log_schema()", + "DROP FUNCTION IF EXISTS pgstream.get_schema(text)", + "DROP FUNCTION IF EXISTS pgstream.refresh_schema()", + "DROP TABLE IF EXISTS pgstream.schema_log", + "DROP TABLE IF EXISTS pgstream.schema_migrations", + } + + for _, stmt := range cleanupStatements { + if _, err := conn.Exec(ctx, stmt); err != nil { + return fmt.Errorf("executing %q: %w", stmt, err) + } + } + + return nil +} diff --git a/pkg/stream/stream_init_test.go b/pkg/stream/stream_init_test.go new file mode 100644 index 00000000..6ef4cc07 --- /dev/null +++ b/pkg/stream/stream_init_test.go @@ -0,0 +1,237 @@ +// SPDX-License-Identifier: Apache-2.0 + +package stream + +import ( + "context" + "os" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + "github.com/xataio/pgstream/internal/testcontainers" +) + +var testPGURL string + +func TestMain(m *testing.M) { + if os.Getenv("PGSTREAM_INTEGRATION_TESTS") != "" { + ctx := context.Background() + cleanup, err := testcontainers.SetupPostgresContainer(ctx, &testPGURL, testcontainers.Postgres14, "../../pkg/stream/integration/config/postgresql.conf") + if err != nil { + panic(err) + } + defer cleanup() + } + + os.Exit(m.Run()) +} + +func TestInit_Upgrade_WithV09xState(t *testing.T) { + if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" { + t.Skip("skipping integration test") + } + + ctx := context.Background() + + // Set up v0.9.x state + conn, err := pgx.Connect(ctx, testPGURL) + require.NoError(t, err) + defer conn.Close(ctx) + + setupV09xState(t, ctx, conn) + + // Run Init with Upgrade=true and MigrationsOnly=true (to avoid + // needing wal2json for replication slot creation) + err = Init(ctx, &InitConfig{ + PostgresURL: testPGURL, + Upgrade: true, + MigrationsOnly: true, + }) + require.NoError(t, err) + + // Verify v0.9.x objects are removed + assertObjectNotExists(t, ctx, conn, "table", "pgstream", "schema_log") + assertObjectNotExists(t, ctx, conn, "table", "pgstream", "schema_migrations") + + // Verify v1.0 migration tables exist + assertObjectExists(t, ctx, conn, "table", "pgstream", "schema_migrations_core") +} + +func TestInit_Upgrade_NoV09xState(t *testing.T) { + if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" { + t.Skip("skipping integration test") + } + + ctx := context.Background() + + // Clean up any prior state + conn, err := pgx.Connect(ctx, testPGURL) + require.NoError(t, err) + cleanupAllState(t, ctx, conn) + conn.Close(ctx) + + // Run Init with Upgrade=true on a clean DB + err = Init(ctx, &InitConfig{ + PostgresURL: testPGURL, + Upgrade: true, + MigrationsOnly: true, + }) + require.NoError(t, err) + + conn, err = pgx.Connect(ctx, testPGURL) + require.NoError(t, err) + defer conn.Close(ctx) + + // Verify v1.0 state was created + assertObjectExists(t, ctx, conn, "table", "pgstream", "schema_migrations_core") +} + +func TestInit_Upgrade_Idempotent(t *testing.T) { + if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" { + t.Skip("skipping integration test") + } + + ctx := context.Background() + + // Clean up any prior state + conn, err := pgx.Connect(ctx, testPGURL) + require.NoError(t, err) + cleanupAllState(t, ctx, conn) + conn.Close(ctx) + + cfg := &InitConfig{ + PostgresURL: testPGURL, + Upgrade: true, + MigrationsOnly: true, + } + + // First run + err = Init(ctx, cfg) + require.NoError(t, err) + + // Second run — should be a no-op, no errors + err = Init(ctx, cfg) + require.NoError(t, err) + + conn, err = pgx.Connect(ctx, testPGURL) + require.NoError(t, err) + defer conn.Close(ctx) + + assertObjectExists(t, ctx, conn, "table", "pgstream", "schema_migrations_core") +} + +func TestInit_Upgrade_WithInjector(t *testing.T) { + if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" { + t.Skip("skipping integration test") + } + + ctx := context.Background() + + // Clean up any prior state and set up v0.9.x state + conn, err := pgx.Connect(ctx, testPGURL) + require.NoError(t, err) + cleanupAllState(t, ctx, conn) + setupV09xState(t, ctx, conn) + + // Insert test data into table_ids (should be preserved across upgrade) + _, err = conn.Exec(ctx, `CREATE TABLE IF NOT EXISTS pgstream.table_ids (id text PRIMARY KEY)`) + require.NoError(t, err) + _, err = conn.Exec(ctx, `INSERT INTO pgstream.table_ids (id) VALUES ('test-id') ON CONFLICT DO NOTHING`) + require.NoError(t, err) + conn.Close(ctx) + + // Run Init with Upgrade and injector enabled + err = Init(ctx, &InitConfig{ + PostgresURL: testPGURL, + Upgrade: true, + MigrationsOnly: true, + InjectorMigrationsEnabled: true, + }) + require.NoError(t, err) + + conn, err = pgx.Connect(ctx, testPGURL) + require.NoError(t, err) + defer conn.Close(ctx) + + // Verify v0.9.x objects are removed + assertObjectNotExists(t, ctx, conn, "table", "pgstream", "schema_log") + assertObjectNotExists(t, ctx, conn, "table", "pgstream", "schema_migrations") + + // Verify injector migration table exists + assertObjectExists(t, ctx, conn, "table", "pgstream", "schema_migrations_injector") + + // Verify table_ids data was preserved + var id string + err = conn.QueryRow(ctx, `SELECT id FROM pgstream.table_ids WHERE id = 'test-id'`).Scan(&id) + require.NoError(t, err) + require.Equal(t, "test-id", id) +} + +func TestWithUpgrade(t *testing.T) { + t.Parallel() + + cfg := &InitConfig{} + WithUpgrade()(cfg) + require.True(t, cfg.Upgrade) +} + +// setupV09xState creates the database objects that v0.9.x would have created. +func setupV09xState(t *testing.T, ctx context.Context, conn *pgx.Conn) { + t.Helper() + + statements := []string{ + "CREATE SCHEMA IF NOT EXISTS pgstream", + "CREATE TABLE IF NOT EXISTS pgstream.schema_migrations (version bigint PRIMARY KEY, dirty boolean NOT NULL)", + "CREATE TABLE IF NOT EXISTS pgstream.schema_log (id serial PRIMARY KEY, version text)", + `CREATE OR REPLACE FUNCTION pgstream.log_schema() RETURNS event_trigger AS $$ BEGIN END; $$ LANGUAGE plpgsql`, + `CREATE OR REPLACE FUNCTION pgstream.get_schema(p_name text) RETURNS text AS $$ BEGIN RETURN ''; END; $$ LANGUAGE plpgsql`, + `CREATE OR REPLACE FUNCTION pgstream.refresh_schema() RETURNS void AS $$ BEGIN END; $$ LANGUAGE plpgsql`, + `DROP EVENT TRIGGER IF EXISTS pgstream_log_schema_create_alter_table`, + `CREATE EVENT TRIGGER pgstream_log_schema_create_alter_table ON ddl_command_end EXECUTE FUNCTION pgstream.log_schema()`, + `DROP EVENT TRIGGER IF EXISTS pgstream_log_schema_drop_schema_table`, + `CREATE EVENT TRIGGER pgstream_log_schema_drop_schema_table ON sql_drop EXECUTE FUNCTION pgstream.log_schema()`, + } + + for _, stmt := range statements { + _, err := conn.Exec(ctx, stmt) + require.NoError(t, err, "failed to execute: %s", stmt) + } +} + +// cleanupAllState drops the pgstream schema and any event triggers. +func cleanupAllState(t *testing.T, ctx context.Context, conn *pgx.Conn) { + t.Helper() + + statements := []string{ + "DROP EVENT TRIGGER IF EXISTS pgstream_log_schema_create_alter_table", + "DROP EVENT TRIGGER IF EXISTS pgstream_log_schema_drop_schema_table", + "DROP SCHEMA IF EXISTS pgstream CASCADE", + } + for _, stmt := range statements { + _, err := conn.Exec(ctx, stmt) + require.NoError(t, err) + } +} + +func assertObjectExists(t *testing.T, ctx context.Context, conn *pgx.Conn, objectType, schema, name string) { + t.Helper() + + var exists bool + err := conn.QueryRow(ctx, + `SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2)`, + schema, name).Scan(&exists) + require.NoError(t, err) + require.True(t, exists, "%s %s.%s should exist", objectType, schema, name) +} + +func assertObjectNotExists(t *testing.T, ctx context.Context, conn *pgx.Conn, objectType, schema, name string) { + t.Helper() + + var exists bool + err := conn.QueryRow(ctx, + `SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2)`, + schema, name).Scan(&exists) + require.NoError(t, err) + require.False(t, exists, "%s %s.%s should not exist", objectType, schema, name) +} diff --git a/pkg/stream/stream_run.go b/pkg/stream/stream_run.go index 0869bf04..7a37d866 100644 --- a/pkg/stream/stream_run.go +++ b/pkg/stream/stream_run.go @@ -28,13 +28,13 @@ import ( ) // Run will run the configured pgstream processes. This call is blocking. -func Run(ctx context.Context, logger loglib.Logger, config *Config, init bool, instrumentation *otel.Instrumentation) error { +func Run(ctx context.Context, logger loglib.Logger, config *Config, init bool, instrumentation *otel.Instrumentation, opts ...InitOption) error { if err := config.IsValid(); err != nil { return fmt.Errorf("incompatible configuration: %w", err) } if init { - if err := Init(ctx, config.GetInitConfig()); err != nil { + if err := Init(ctx, config.GetInitConfig(opts...)); err != nil { return err } }