Skip to content

Commit fbcee7a

Browse files
authored
Give GetListener a params struct instead of a raw string (#867)
In #848 I went through and gave all driver functions a params struct instead of whatever they had before. This tends to be more convenient because it's uniform everywhere, and means that we need to change less when a new specific parameter is added henceforth. One I'd intended to do but forgot about was `GetListener` which currently just takes a raw schema string. We haven't cut a release for the change in #848, so it's a good time to change these driver interfaces now, so here we give `GetListener` a params struct as well. This is partly driven by me investigating [1], in which it might be desirable to have a sub-schema leadership namespace so that different River clients configured with different periodic jobs could run inside the same River schema, which would necessitate a new listener parameter. I'm not sure whether this project is a good idea or not yet, so I'll need to investigate a little more. [1] #742 (comment)
1 parent f319fd6 commit fbcee7a

8 files changed

Lines changed: 17 additions & 13 deletions

File tree

client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
701701
// uses listen/notify. Instead, each service polls for changes it's
702702
// interested in. e.g. Elector polls to see if leader has expired.
703703
if !config.PollOnly {
704-
client.notifier = notifier.New(archetype, driver.GetListener(config.Schema))
704+
client.notifier = notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: config.Schema}))
705705
client.services = append(client.services, client.notifier)
706706
}
707707
} else {

internal/leadership/elector_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func TestElector_WithNotifier(t *testing.T) {
9393
archetype = riversharedtest.BaseServiceArchetype(t)
9494
)
9595

96-
notifier := notifier.New(archetype, driver.GetListener(schema))
96+
notifier := notifier.New(archetype, driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema}))
9797
{
9898
require.NoError(t, notifier.Start(ctx))
9999
t.Cleanup(notifier.Stop)

internal/notifier/notifier_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestNotifier(t *testing.T) {
6060
dbPool = cmp.Or(opts.dbPool, riversharedtest.DBPool(ctx, t))
6161
driver = riverpgxv5.New(dbPool)
6262
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
63-
listener = driver.GetListener(schema)
63+
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})
6464
)
6565

6666
notifier := New(riversharedtest.BaseServiceArchetype(t), listener)

internal/riverinternaltest/riverdrivertest/riverdrivertest.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3212,7 +3212,7 @@ func setupListener[TTx any](ctx context.Context, t *testing.T, driverWithPool fu
32123212

32133213
var (
32143214
driver, schema = driverWithPool(ctx, t)
3215-
listener = driver.GetListener(schema)
3215+
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})
32163216
)
32173217

32183218
return listener, &testListenerBundle[TTx]{
@@ -3305,7 +3305,7 @@ func exerciseListener[TTx any](ctx context.Context, t *testing.T, driverWithPool
33053305

33063306
var (
33073307
driver, _ = driverWithPool(ctx, t)
3308-
listener = driver.GetListener("my_custom_schema")
3308+
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: "my_custom_schema"})
33093309
)
33103310

33113311
require.Equal(t, "my_custom_schema", listener.Schema())
@@ -3325,7 +3325,7 @@ func exerciseListener[TTx any](ctx context.Context, t *testing.T, driverWithPool
33253325

33263326
var (
33273327
driver, _ = driverWithPool(ctx, t)
3328-
listener = driver.GetListener("")
3328+
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: ""})
33293329
)
33303330

33313331
connectListener(ctx, t, listener)
@@ -3337,7 +3337,7 @@ func exerciseListener[TTx any](ctx context.Context, t *testing.T, driverWithPool
33373337

33383338
var (
33393339
driver, _ = driverWithPool(ctx, t)
3340-
listener = driver.GetListener("")
3340+
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: ""})
33413341
)
33423342

33433343
connectListener(ctx, t, listener)

producer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) {
5959
driver := riverpgxv5.New(dbPool)
6060
exec := driver.GetExecutor()
6161
schema := riverdbtest.TestSchema(ctx, t, driver, nil)
62-
listener := driver.GetListener(schema)
62+
listener := driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})
6363
pilot := &riverpilot.StandardPilot{}
6464

6565
subscribeChan := make(chan []jobcompleter.CompleterJobUpdated, 100)
@@ -223,7 +223,7 @@ func TestProducer_WithNotifier(t *testing.T) {
223223
exec = driver.GetExecutor()
224224
jobUpdates = make(chan []jobcompleter.CompleterJobUpdated, 10)
225225
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
226-
listener = driver.GetListener(schema)
226+
listener = driver.GetListener(&riverdriver.GetListenenerParams{Schema: schema})
227227
pilot = &riverpilot.StandardPilot{}
228228
queueName = fmt.Sprintf("test-producer-with-notifier-%05d", randutil.IntBetween(1, 100_000))
229229
)

riverdriver/river_driver_interface.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ type Driver[TTx any] interface {
5353
// GetListener gets a listener for purposes of receiving notifications.
5454
//
5555
// API is not stable. DO NOT USE.
56-
GetListener(schema string) Listener
56+
GetListener(params *GetListenenerParams) Listener
5757

5858
// GetMigrationDefaultLines gets default migration lines that should be
5959
// applied when using this driver. This is mainly used by riverdbtest to
@@ -209,6 +209,10 @@ type ExecutorTx interface {
209209
Rollback(ctx context.Context) error
210210
}
211211

212+
type GetListenenerParams struct {
213+
Schema string
214+
}
215+
212216
// Listener listens for notifications. In Postgres, this is a database
213217
// connection where `LISTEN` has been run.
214218
//

riverdriver/riverdatabasesql/river_database_sql_driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (d *Driver) GetExecutor() riverdriver.Executor {
5454
return &Executor{d.dbPool, templateReplaceWrapper{d.dbPool, &d.replacer}, d}
5555
}
5656

57-
func (d *Driver) GetListener(schema string) riverdriver.Listener {
57+
func (d *Driver) GetListener(params *riverdriver.GetListenenerParams) riverdriver.Listener {
5858
panic(riverdriver.ErrNotImplemented)
5959
}
6060

riverdriver/riverpgxv5/river_pgx_v5_driver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ func (d *Driver) GetExecutor() riverdriver.Executor {
6161
return &Executor{templateReplaceWrapper{d.dbPool, &d.replacer}, d}
6262
}
6363

64-
func (d *Driver) GetListener(schema string) riverdriver.Listener {
65-
return &Listener{dbPool: d.dbPool, schema: schema}
64+
func (d *Driver) GetListener(params *riverdriver.GetListenenerParams) riverdriver.Listener {
65+
return &Listener{dbPool: d.dbPool, schema: params.Schema}
6666
}
6767

6868
func (d *Driver) GetMigrationDefaultLines() []string { return []string{riverdriver.MigrationLineMain} }

0 commit comments

Comments
 (0)