From 9295a7e9af3cf6d04ee80a68441451af9ab19fd0 Mon Sep 17 00:00:00 2001 From: Blake Watters Date: Sat, 11 Apr 2026 12:01:43 -0400 Subject: [PATCH] Add add_tables and filter_tables wal2json plugin options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allows passing wal2json's add-tables and filter-tables options via pgstream config (YAML and env vars). These filter at the source-side decode level, preventing wal2json from generating JSON for excluded tables — dramatically reducing CPU and network overhead. Config: source.postgres.replication.plugin.add_tables: "public.*" source.postgres.replication.plugin.filter_tables: "pipelines.*,private.*" Env vars: PGSTREAM_POSTGRES_REPLICATION_PLUGIN_ADD_TABLES PGSTREAM_POSTGRES_REPLICATION_PLUGIN_FILTER_TABLES --- cmd/config/config_env.go | 4 +++- cmd/config/config_yaml.go | 6 +++++- pkg/wal/replication/postgres/pg_replication_handler.go | 10 +++++++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/cmd/config/config_env.go b/cmd/config/config_env.go index 6f3316c3..45da75ab 100644 --- a/cmd/config/config_env.go +++ b/cmd/config/config_env.go @@ -213,7 +213,9 @@ func parsePostgresListenerConfig() (*stream.PostgresListenerConfig, error) { PostgresURL: pgURL, ReplicationSlotName: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME"), PluginArguments: pgreplication.PluginArguments{ - IncludeXIDs: viper.GetBool("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_INCLUDE_XIDS"), + IncludeXIDs: viper.GetBool("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_INCLUDE_XIDS"), + AddTables: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_ADD_TABLES"), + FilterTables: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_FILTER_TABLES"), }, }, RetryPolicy: parseBackoffConfig("PGSTREAM_POSTGRES_LISTENER"), diff --git a/cmd/config/config_yaml.go b/cmd/config/config_yaml.go index 6d2e709c..ef88e9b2 100644 --- a/cmd/config/config_yaml.go +++ b/cmd/config/config_yaml.go @@ -116,7 +116,9 @@ type ReplicationConfig struct { } type PluginConfig struct { - IncludeXIDs bool `mapstructure:"include_xids" yaml:"include_xids"` + IncludeXIDs bool `mapstructure:"include_xids" yaml:"include_xids"` + AddTables string `mapstructure:"add_tables" yaml:"add_tables"` + FilterTables string `mapstructure:"filter_tables" yaml:"filter_tables"` } type KafkaConfig struct { @@ -429,6 +431,8 @@ func (c *YAMLConfig) parsePostgresListenerConfig() (*stream.PostgresListenerConf replicationSlotName = c.Source.Postgres.Replication.ReplicationSlot if c.Source.Postgres.Replication.Plugin != nil { pluginArgs.IncludeXIDs = c.Source.Postgres.Replication.Plugin.IncludeXIDs + pluginArgs.AddTables = c.Source.Postgres.Replication.Plugin.AddTables + pluginArgs.FilterTables = c.Source.Postgres.Replication.Plugin.FilterTables } } streamCfg.Replication = pgreplication.Config{ diff --git a/pkg/wal/replication/postgres/pg_replication_handler.go b/pkg/wal/replication/postgres/pg_replication_handler.go index 23c49810..f53fca33 100644 --- a/pkg/wal/replication/postgres/pg_replication_handler.go +++ b/pkg/wal/replication/postgres/pg_replication_handler.go @@ -47,7 +47,9 @@ type Config struct { } type PluginArguments struct { - IncludeXIDs bool + IncludeXIDs bool + AddTables string // wal2json add-tables option (e.g., "public.*") + FilterTables string // wal2json filter-tables option (e.g., "pipelines.*,private.*") } type Option func(h *Handler) @@ -110,6 +112,12 @@ func NewHandler(ctx context.Context, cfg Config, opts ...Option) (*Handler, erro if cfg.PluginArguments.IncludeXIDs { h.pluginArguments = append(h.pluginArguments, `"include-xids" '1'`) } + if cfg.PluginArguments.AddTables != "" { + h.pluginArguments = append(h.pluginArguments, fmt.Sprintf(`"add-tables" '%s'`, cfg.PluginArguments.AddTables)) + } + if cfg.PluginArguments.FilterTables != "" { + h.pluginArguments = append(h.pluginArguments, fmt.Sprintf(`"filter-tables" '%s'`, cfg.PluginArguments.FilterTables)) + } if len(cfg.IncludeTables) > 0 { h.includedTables, err = pglib.NewSchemaTableMap(cfg.IncludeTables)