Skip to content

Commit 1dc7cc1

Browse files
authored
Add add_tables and filter_tables wal2json plugin options (#791)
Allows passing wal2json's add-tables and filter-tables options via pgstream config (YAML and env vars). These filter at the source-side decode level, preventing wal2json from generating JSON for excluded tables — dramatically reducing CPU and network overhead. Config: source.postgres.replication.plugin.add_tables: "public.*" source.postgres.replication.plugin.filter_tables: "pipelines.*,private.*" Env vars: PGSTREAM_POSTGRES_REPLICATION_PLUGIN_ADD_TABLES PGSTREAM_POSTGRES_REPLICATION_PLUGIN_FILTER_TABLES
1 parent 59a723d commit 1dc7cc1

3 files changed

Lines changed: 17 additions & 3 deletions

File tree

cmd/config/config_env.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,9 @@ func parsePostgresListenerConfig() (*stream.PostgresListenerConfig, error) {
213213
PostgresURL: pgURL,
214214
ReplicationSlotName: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME"),
215215
PluginArguments: pgreplication.PluginArguments{
216-
IncludeXIDs: viper.GetBool("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_INCLUDE_XIDS"),
216+
IncludeXIDs: viper.GetBool("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_INCLUDE_XIDS"),
217+
AddTables: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_ADD_TABLES"),
218+
FilterTables: viper.GetString("PGSTREAM_POSTGRES_REPLICATION_PLUGIN_FILTER_TABLES"),
217219
},
218220
},
219221
RetryPolicy: parseBackoffConfig("PGSTREAM_POSTGRES_LISTENER"),

cmd/config/config_yaml.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ type ReplicationConfig struct {
116116
}
117117

118118
type PluginConfig struct {
119-
IncludeXIDs bool `mapstructure:"include_xids" yaml:"include_xids"`
119+
IncludeXIDs bool `mapstructure:"include_xids" yaml:"include_xids"`
120+
AddTables string `mapstructure:"add_tables" yaml:"add_tables"`
121+
FilterTables string `mapstructure:"filter_tables" yaml:"filter_tables"`
120122
}
121123

122124
type KafkaConfig struct {
@@ -429,6 +431,8 @@ func (c *YAMLConfig) parsePostgresListenerConfig() (*stream.PostgresListenerConf
429431
replicationSlotName = c.Source.Postgres.Replication.ReplicationSlot
430432
if c.Source.Postgres.Replication.Plugin != nil {
431433
pluginArgs.IncludeXIDs = c.Source.Postgres.Replication.Plugin.IncludeXIDs
434+
pluginArgs.AddTables = c.Source.Postgres.Replication.Plugin.AddTables
435+
pluginArgs.FilterTables = c.Source.Postgres.Replication.Plugin.FilterTables
432436
}
433437
}
434438
streamCfg.Replication = pgreplication.Config{

pkg/wal/replication/postgres/pg_replication_handler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ type Config struct {
4747
}
4848

4949
type PluginArguments struct {
50-
IncludeXIDs bool
50+
IncludeXIDs bool
51+
AddTables string // wal2json add-tables option (e.g., "public.*")
52+
FilterTables string // wal2json filter-tables option (e.g., "pipelines.*,private.*")
5153
}
5254

5355
type Option func(h *Handler)
@@ -110,6 +112,12 @@ func NewHandler(ctx context.Context, cfg Config, opts ...Option) (*Handler, erro
110112
if cfg.PluginArguments.IncludeXIDs {
111113
h.pluginArguments = append(h.pluginArguments, `"include-xids" '1'`)
112114
}
115+
if cfg.PluginArguments.AddTables != "" {
116+
h.pluginArguments = append(h.pluginArguments, fmt.Sprintf(`"add-tables" '%s'`, cfg.PluginArguments.AddTables))
117+
}
118+
if cfg.PluginArguments.FilterTables != "" {
119+
h.pluginArguments = append(h.pluginArguments, fmt.Sprintf(`"filter-tables" '%s'`, cfg.PluginArguments.FilterTables))
120+
}
113121

114122
if len(cfg.IncludeTables) > 0 {
115123
h.includedTables, err = pglib.NewSchemaTableMap(cfg.IncludeTables)

0 commit comments

Comments
 (0)