diff --git a/cmd/config/config_env.go b/cmd/config/config_env.go index 6f3316c3..45da75ab 100644 --- a/cmd/config/config_env.go +++ b/cmd/config/config_env.go @@ -213,7 +213,9 @@ func parsePostgresListenerConfig() (*stream.PostgresListenerConfig, error) { PostgresURL: pgURL, ReplicationSlotName: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME"), PluginArguments: pgreplication.PluginArguments{ - IncludeXIDs: viper.GetBool("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_INCLUDE_XIDS"), + IncludeXIDs: viper.GetBool("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_INCLUDE_XIDS"), + AddTables: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_ADD_TABLES"), + FilterTables: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_FILTER_TABLES"), }, }, RetryPolicy: parseBackoffConfig("PGSTREAM_POSTGRES_LISTENER"), diff --git a/cmd/config/config_yaml.go b/cmd/config/config_yaml.go index 6d2e709c..ef88e9b2 100644 --- a/cmd/config/config_yaml.go +++ b/cmd/config/config_yaml.go @@ -116,7 +116,9 @@ type ReplicationConfig struct { } type PluginConfig struct { - IncludeXIDs bool `mapstructure:"include_xids" yaml:"include_xids"` + IncludeXIDs bool `mapstructure:"include_xids" yaml:"include_xids"` + AddTables string `mapstructure:"add_tables" yaml:"add_tables"` + FilterTables string `mapstructure:"filter_tables" yaml:"filter_tables"` } type KafkaConfig struct { @@ -429,6 +431,8 @@ func (c *YAMLConfig) parsePostgresListenerConfig() (*stream.PostgresListenerConf replicationSlotName = c.Source.Postgres.Replication.ReplicationSlot if c.Source.Postgres.Replication.Plugin != nil { pluginArgs.IncludeXIDs = c.Source.Postgres.Replication.Plugin.IncludeXIDs + pluginArgs.AddTables = c.Source.Postgres.Replication.Plugin.AddTables + pluginArgs.FilterTables = c.Source.Postgres.Replication.Plugin.FilterTables } } streamCfg.Replication = pgreplication.Config{ diff --git a/pkg/wal/replication/postgres/pg_replication_handler.go b/pkg/wal/replication/postgres/pg_replication_handler.go index 23c49810..f53fca33 100644 --- a/pkg/wal/replication/postgres/pg_replication_handler.go +++ b/pkg/wal/replication/postgres/pg_replication_handler.go @@ -47,7 +47,9 @@ type Config struct { } type PluginArguments struct { - IncludeXIDs bool + IncludeXIDs bool + AddTables string // wal2json add-tables option (e.g., "public.*") + FilterTables string // wal2json filter-tables option (e.g., "pipelines.*,private.*") } type Option func(h *Handler) @@ -110,6 +112,12 @@ func NewHandler(ctx context.Context, cfg Config, opts ...Option) (*Handler, erro if cfg.PluginArguments.IncludeXIDs { h.pluginArguments = append(h.pluginArguments, `"include-xids" '1'`) } + if cfg.PluginArguments.AddTables != "" { + h.pluginArguments = append(h.pluginArguments, fmt.Sprintf(`"add-tables" '%s'`, cfg.PluginArguments.AddTables)) + } + if cfg.PluginArguments.FilterTables != "" { + h.pluginArguments = append(h.pluginArguments, fmt.Sprintf(`"filter-tables" '%s'`, cfg.PluginArguments.FilterTables)) + } if len(cfg.IncludeTables) > 0 { h.includedTables, err = pglib.NewSchemaTableMap(cfg.IncludeTables)