Skip to content

Commit 5bec84c

Browse files
committed
Full schema insertion + schema testing
Here, introduce a full version of schema injection as initially started in #798, and exposing a new `Schema` option to make it configurable even outside the test suite. This involved fixing a lot of bugs from #798, and the only way to make it possible to root them all out was to make full use of schemas across the entire test suite. The original "test DB manager" system has been replaced with a new `riverschematest.TestSchema` helper that generates a schema for use with a test case or prefers an existing idle one that was already generated for the same test run. `TestSchema` runs migrations for generated schemas which also means we don't need to use `testdbman` anymore, with tests capable of bootstrapping themselves at run time. We reuse schemas to avoid this extra work when possible, but migrating a new schema is also surprisingly fast, taking up to 50ms, but getting more down to a stable ~10ms once things are warmed up. Here's a series of sample timings: $ go test ./rivermigrate -run TestMigrator/MigrateUpDefault -test.v -count 50 | grep 'migrations ran in' river_migrate_test.go:492: migrations ran in 45.571209ms river_migrate_test.go:492: migrations ran in 24.642458ms river_migrate_test.go:492: migrations ran in 16.749708ms river_migrate_test.go:492: migrations ran in 22.970375ms river_migrate_test.go:492: migrations ran in 16.201375ms river_migrate_test.go:492: migrations ran in 15.727625ms river_migrate_test.go:492: migrations ran in 13.291333ms river_migrate_test.go:492: migrations ran in 13.680708ms river_migrate_test.go:492: migrations ran in 14.867416ms river_migrate_test.go:492: migrations ran in 15.631916ms river_migrate_test.go:492: migrations ran in 13.873791ms river_migrate_test.go:492: migrations ran in 14.8645ms river_migrate_test.go:492: migrations ran in 14.92575ms river_migrate_test.go:492: migrations ran in 12.541834ms river_migrate_test.go:492: migrations ran in 14.753875ms river_migrate_test.go:492: migrations ran in 12.694334ms river_migrate_test.go:492: migrations ran in 13.955917ms river_migrate_test.go:492: migrations ran in 12.126458ms river_migrate_test.go:492: migrations ran in 14.095958ms river_migrate_test.go:492: migrations ran in 13.273375ms river_migrate_test.go:492: migrations ran in 13.988917ms river_migrate_test.go:492: migrations ran in 13.141459ms river_migrate_test.go:492: migrations ran in 12.394417ms river_migrate_test.go:492: migrations ran in 11.539208ms river_migrate_test.go:492: migrations ran in 11.577834ms river_migrate_test.go:492: migrations ran in 10.883375ms river_migrate_test.go:492: migrations ran in 10.547417ms river_migrate_test.go:492: migrations ran in 12.330375ms river_migrate_test.go:492: migrations ran in 11.54575ms river_migrate_test.go:492: migrations ran in 11.437458ms river_migrate_test.go:492: migrations ran in 10.957ms river_migrate_test.go:492: migrations ran in 10.589083ms river_migrate_test.go:492: migrations ran in 9.758583ms Removal of the "test DB manager" system also means that we can ungate test from `-p 1` because they're all able to run in parallel now. The limiting factor I ran in is that we need to keep max pool connections within each package's tests to a relatively modest number (I found 15 seemed to maximum success) so parallel packages don't exceed the default Postgres configuration of 100 connections. Something that can be kind of annoying is that in case a schema isn't used properly somewhere in a test case (i.e. `TestSchema` is run, but then not used), inserts/operations will go the default schema, which will leave debris there, and that will interfere with test cases using `TestTx` (with test DB manager, all debris would go to a different database so you wouldn't notice). To remediate this, I've added a cleanup hook to `TestSchema` that looks for leftovers that may have been added to the default schema. This isn't perfect because those leftovers may have come from another test case running in parallel or which ran previously, but it helps to zero in on the original source of the issue.
1 parent cc5f0e0 commit 5bec84c

85 files changed

Lines changed: 1955 additions & 2226 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yaml

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ env:
88
DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_dev?sslmode=disable
99

1010
# Test database.
11-
TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_test?sslmode=disable
11+
TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_test?pool_max_conns=50&sslmode=disable
1212

1313
on:
1414
push:
@@ -62,42 +62,13 @@ jobs:
6262
- name: Display Go version
6363
run: go version
6464

65-
- name: Set up test DBs
66-
run: go run ./internal/cmd/testdbman create
67-
env:
68-
PGHOST: 127.0.0.1
69-
PGPORT: 5432
70-
PGUSER: postgres
71-
PGPASSWORD: postgres
72-
PGSSLMODE: disable
65+
- name: Set up database
66+
run: |
67+
psql -c "CREATE DATABASE river_test" postgres://postgres:postgres@localhost:5432
68+
go run github.com/riverqueue/river/cmd/river@latest migrate-up --database-url "$TEST_DATABASE_URL"
7369
7470
- name: Test
75-
working-directory: .
76-
run: go test -p 1 -race ./... -timeout 2m
77-
78-
- name: Test cmd/river
79-
working-directory: ./cmd/river
80-
run: go test -race ./... -timeout 2m
81-
82-
- name: Test riverdriver
83-
working-directory: ./riverdriver
84-
run: go test -race ./... -timeout 2m
85-
86-
- name: Test riverdriver/riverdatabasesql
87-
working-directory: ./riverdriver/riverdatabasesql
88-
run: go test -race ./... -timeout 2m
89-
90-
- name: Test riverdriver/riverpgxv5
91-
working-directory: ./riverdriver/riverpgxv5
92-
run: go test -race ./... -timeout 2m
93-
94-
- name: Test rivershared
95-
working-directory: ./rivershared
96-
run: go test -race ./... -timeout 2m
97-
98-
- name: Test rivertype
99-
working-directory: ./rivertype
100-
run: go test -race ./... -timeout 2m
71+
run: make test
10172

10273
cli:
10374
strategy:

.golangci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ linters:
8888
- r
8989
- sb # common convention for string builder
9090
- t
91+
- tb
9192
- tt # common convention for table tests
9293
- tx
9394
- w

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ $(foreach mod,$(submodules),$(eval $(call lint-target,$(mod))))
5353
.PHONY: test
5454
test:: ## Run test suite for all submodules
5555
define test-target
56-
test:: ; cd $1 && go test ./... -p 1
56+
test:: ; cd $1 && go test ./...
5757
endef
5858
$(foreach mod,$(submodules),$(eval $(call test-target,$(mod))))
5959

6060
.PHONY: test/race
6161
test/race:: ## Run test suite for all submodules with race detector
6262
define test-race-target
63-
test/race:: ; cd $1 && go test ./... -p 1 -race
63+
test/race:: ; cd $1 && go test ./... -race
6464
endef
6565
$(foreach mod,$(submodules),$(eval $(call test-race-target,$(mod))))
6666

client.go

Lines changed: 38 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -282,12 +282,12 @@ type Config struct {
282282
// Defaults to DefaultRetryPolicy.
283283
RetryPolicy ClientRetryPolicy
284284

285-
// schema is a non-standard schema where River tables are located. All table
285+
// Schema is a non-standard Schema where River tables are located. All table
286286
// references in database queries will use this value as a prefix.
287287
//
288288
// Defaults to empty, which causes the client to look for tables using the
289289
// setting of Postgres `search_path`.
290-
schema string
290+
Schema string
291291

292292
// SkipUnknownJobCheck is a flag to control whether the client should skip
293293
// checking to see if a registered worker exists in the client's worker bundle
@@ -383,7 +383,7 @@ func (c *Config) WithDefaults() *Config {
383383
ReindexerSchedule: c.ReindexerSchedule,
384384
RescueStuckJobsAfter: valutil.ValOrDefault(c.RescueStuckJobsAfter, rescueAfter),
385385
RetryPolicy: retryPolicy,
386-
schema: c.schema,
386+
Schema: c.Schema,
387387
SkipUnknownJobCheck: c.SkipUnknownJobCheck,
388388
Test: c.Test,
389389
TestOnly: c.TestOnly,
@@ -689,7 +689,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
689689
// uses listen/notify. Instead, each service polls for changes it's
690690
// interested in. e.g. Elector polls to see if leader has expired.
691691
if !config.PollOnly {
692-
client.notifier = notifier.New(archetype, driver.GetListener(config.schema))
692+
client.notifier = notifier.New(archetype, driver.GetListener(config.Schema))
693693
client.services = append(client.services, client.notifier)
694694
}
695695
} else {
@@ -698,6 +698,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
698698

699699
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
700700
ClientID: config.ID,
701+
Schema: config.Schema,
701702
})
702703
client.services = append(client.services, client.elector)
703704

@@ -726,7 +727,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
726727
CancelledJobRetentionPeriod: config.CancelledJobRetentionPeriod,
727728
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
728729
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
729-
Schema: config.schema,
730+
Schema: config.Schema,
730731
Timeout: config.JobCleanerTimeout,
731732
}, driver.GetExecutor())
732733
maintenanceServices = append(maintenanceServices, jobCleaner)
@@ -737,7 +738,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
737738
jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{
738739
ClientRetryPolicy: config.RetryPolicy,
739740
RescueAfter: config.RescueStuckJobsAfter,
740-
Schema: config.schema,
741+
Schema: config.Schema,
741742
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
742743
if workerInfo, ok := config.Workers.workersMap[kind]; ok {
743744
return workerInfo.workUnitFactory
@@ -753,7 +754,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
753754
jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{
754755
Interval: config.schedulerInterval,
755756
NotifyInsert: client.maybeNotifyInsertForQueues,
756-
Schema: config.schema,
757+
Schema: config.Schema,
757758
}, driver.GetExecutor())
758759
maintenanceServices = append(maintenanceServices, jobScheduler)
759760
client.testSignals.jobScheduler = &jobScheduler.TestSignals
@@ -774,7 +775,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
774775
{
775776
queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{
776777
RetentionPeriod: maintenance.QueueRetentionPeriodDefault,
777-
Schema: config.schema,
778+
Schema: config.Schema,
778779
}, driver.GetExecutor())
779780
maintenanceServices = append(maintenanceServices, queueCleaner)
780781
client.testSignals.queueCleaner = &queueCleaner.TestSignals
@@ -788,7 +789,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
788789

789790
reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{
790791
ScheduleFunc: scheduleFunc,
791-
Schema: config.schema,
792+
Schema: config.Schema,
792793
}, driver.GetExecutor())
793794
maintenanceServices = append(maintenanceServices, reindexer)
794795
client.testSignals.reindexer = &reindexer.TestSignals
@@ -1264,7 +1265,7 @@ func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor,
12641265
ID: jobID,
12651266
CancelAttemptedAt: c.baseService.Time.NowUTC(),
12661267
ControlTopic: string(notifier.NotificationTopicControl),
1267-
Schema: c.config.schema,
1268+
Schema: c.config.Schema,
12681269
})
12691270
}
12701271

@@ -1274,7 +1275,7 @@ func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor,
12741275
func (c *Client[TTx]) JobDelete(ctx context.Context, id int64) (*rivertype.JobRow, error) {
12751276
return c.driver.GetExecutor().JobDelete(ctx, &riverdriver.JobDeleteParams{
12761277
ID: id,
1277-
Schema: c.config.schema,
1278+
Schema: c.config.Schema,
12781279
})
12791280
}
12801281

@@ -1287,7 +1288,7 @@ func (c *Client[TTx]) JobDelete(ctx context.Context, id int64) (*rivertype.JobRo
12871288
func (c *Client[TTx]) JobDeleteTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) {
12881289
return c.driver.UnwrapExecutor(tx).JobDelete(ctx, &riverdriver.JobDeleteParams{
12891290
ID: id,
1290-
Schema: c.config.schema,
1291+
Schema: c.config.Schema,
12911292
})
12921293
}
12931294

@@ -1296,7 +1297,7 @@ func (c *Client[TTx]) JobDeleteTx(ctx context.Context, tx TTx, id int64) (*river
12961297
func (c *Client[TTx]) JobGet(ctx context.Context, id int64) (*rivertype.JobRow, error) {
12971298
return c.driver.GetExecutor().JobGetByID(ctx, &riverdriver.JobGetByIDParams{
12981299
ID: id,
1299-
Schema: c.config.schema,
1300+
Schema: c.config.Schema,
13001301
})
13011302
}
13021303

@@ -1306,7 +1307,7 @@ func (c *Client[TTx]) JobGet(ctx context.Context, id int64) (*rivertype.JobRow,
13061307
func (c *Client[TTx]) JobGetTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) {
13071308
return c.driver.UnwrapExecutor(tx).JobGetByID(ctx, &riverdriver.JobGetByIDParams{
13081309
ID: id,
1309-
Schema: c.config.schema,
1310+
Schema: c.config.Schema,
13101311
})
13111312
}
13121313

@@ -1321,7 +1322,7 @@ func (c *Client[TTx]) JobGetTx(ctx context.Context, tx TTx, id int64) (*rivertyp
13211322
func (c *Client[TTx]) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, error) {
13221323
return c.driver.GetExecutor().JobRetry(ctx, &riverdriver.JobRetryParams{
13231324
ID: id,
1324-
Schema: c.config.schema,
1325+
Schema: c.config.Schema,
13251326
})
13261327
}
13271328

@@ -1341,7 +1342,7 @@ func (c *Client[TTx]) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow
13411342
func (c *Client[TTx]) JobRetryTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) {
13421343
return c.driver.UnwrapExecutor(tx).JobRetry(ctx, &riverdriver.JobRetryParams{
13431344
ID: id,
1344-
Schema: c.config.schema,
1345+
Schema: c.config.Schema,
13451346
})
13461347
}
13471348

@@ -1610,7 +1611,7 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
16101611
return c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
16111612
results, err := c.pilot.JobInsertMany(ctx, tx, &riverdriver.JobInsertFastManyParams{
16121613
Jobs: insertParams,
1613-
Schema: c.config.schema,
1614+
Schema: c.config.Schema,
16141615
})
16151616
if err != nil {
16161617
return nil, err
@@ -1774,16 +1775,16 @@ func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []Ins
17741775
return c.insertManyFast(ctx, exec, params)
17751776
}
17761777

1777-
func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) (int, error) {
1778+
func (c *Client[TTx]) insertManyFast(ctx context.Context, execTx riverdriver.ExecutorTx, params []InsertManyParams) (int, error) {
17781779
insertParams, err := c.insertManyParams(params)
17791780
if err != nil {
17801781
return 0, err
17811782
}
17821783

1783-
results, err := c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
1784-
count, err := tx.JobInsertFastManyNoReturning(ctx, &riverdriver.JobInsertFastManyParams{
1784+
results, err := c.insertManyShared(ctx, execTx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
1785+
count, err := execTx.JobInsertFastManyNoReturning(ctx, &riverdriver.JobInsertFastManyParams{
17851786
Jobs: insertParams,
1786-
Schema: c.config.schema,
1787+
Schema: c.config.Schema,
17871788
})
17881789
if err != nil {
17891790
return nil, err
@@ -1827,7 +1828,7 @@ func (c *Client[TTx]) maybeNotifyInsertForQueues(ctx context.Context, tx riverdr
18271828

18281829
err := tx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
18291830
Payload: payloads,
1830-
Schema: c.config.schema,
1831+
Schema: c.config.Schema,
18311832
Topic: string(notifier.NotificationTopicInsert),
18321833
})
18331834
if err != nil {
@@ -1858,7 +1859,7 @@ func (c *Client[TTx]) notifyQueuePauseOrResume(ctx context.Context, tx riverdriv
18581859

18591860
err = tx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
18601861
Payload: []string{string(payload)},
1861-
Schema: c.config.schema,
1862+
Schema: c.config.Schema,
18621863
Topic: string(notifier.NotificationTopicControl),
18631864
})
18641865
if err != nil {
@@ -1905,7 +1906,7 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *pr
19051906
QueueEventCallback: c.subscriptionManager.distributeQueueEvent,
19061907
RetryPolicy: c.config.RetryPolicy,
19071908
SchedulerInterval: c.config.schedulerInterval,
1908-
Schema: c.config.schema,
1909+
Schema: c.config.Schema,
19091910
StaleProducerRetentionPeriod: 5 * time.Minute,
19101911
Workers: c.config.Workers,
19111912
})
@@ -1955,6 +1956,8 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
19551956
if params == nil {
19561957
params = NewJobListParams()
19571958
}
1959+
params.schema = c.config.Schema
1960+
19581961
dbParams, err := params.toDBParams()
19591962
if err != nil {
19601963
return nil, err
@@ -1984,6 +1987,7 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
19841987
if params == nil {
19851988
params = NewJobListParams()
19861989
}
1990+
params.schema = c.config.Schema
19871991

19881992
dbParams, err := params.toDBParams()
19891993
if err != nil {
@@ -2024,7 +2028,7 @@ func (c *Client[TTx]) Queues() *QueueBundle { return c.queues }
20242028
func (c *Client[TTx]) QueueGet(ctx context.Context, name string) (*rivertype.Queue, error) {
20252029
return c.driver.GetExecutor().QueueGet(ctx, &riverdriver.QueueGetParams{
20262030
Name: name,
2027-
Schema: c.config.schema,
2031+
Schema: c.config.Schema,
20282032
})
20292033
}
20302034

@@ -2036,7 +2040,7 @@ func (c *Client[TTx]) QueueGet(ctx context.Context, name string) (*rivertype.Que
20362040
func (c *Client[TTx]) QueueGetTx(ctx context.Context, tx TTx, name string) (*rivertype.Queue, error) {
20372041
return c.driver.UnwrapExecutor(tx).QueueGet(ctx, &riverdriver.QueueGetParams{
20382042
Name: name,
2039-
Schema: c.config.schema,
2043+
Schema: c.config.Schema,
20402044
})
20412045
}
20422046

@@ -2065,7 +2069,7 @@ func (c *Client[TTx]) QueueList(ctx context.Context, params *QueueListParams) (*
20652069

20662070
queues, err := c.driver.GetExecutor().QueueList(ctx, &riverdriver.QueueListParams{
20672071
Limit: int(params.paginationCount),
2068-
Schema: c.config.schema,
2072+
Schema: c.config.Schema,
20692073
})
20702074
if err != nil {
20712075
return nil, err
@@ -2092,7 +2096,7 @@ func (c *Client[TTx]) QueueListTx(ctx context.Context, tx TTx, params *QueueList
20922096

20932097
queues, err := c.driver.UnwrapExecutor(tx).QueueList(ctx, &riverdriver.QueueListParams{
20942098
Limit: int(params.paginationCount),
2095-
Schema: c.config.schema,
2099+
Schema: c.config.Schema,
20962100
})
20972101
if err != nil {
20982102
return nil, err
@@ -2121,7 +2125,7 @@ func (c *Client[TTx]) QueuePause(ctx context.Context, name string, opts *QueuePa
21212125

21222126
if err := tx.QueuePause(ctx, &riverdriver.QueuePauseParams{
21232127
Name: name,
2124-
Schema: c.config.schema,
2128+
Schema: c.config.Schema,
21252129
}); err != nil {
21262130
return err
21272131
}
@@ -2149,7 +2153,7 @@ func (c *Client[TTx]) QueuePauseTx(ctx context.Context, tx TTx, name string, opt
21492153

21502154
if err := executorTx.QueuePause(ctx, &riverdriver.QueuePauseParams{
21512155
Name: name,
2152-
Schema: c.config.schema,
2156+
Schema: c.config.Schema,
21532157
}); err != nil {
21542158
return err
21552159
}
@@ -2182,7 +2186,7 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP
21822186

21832187
if err := tx.QueueResume(ctx, &riverdriver.QueueResumeParams{
21842188
Name: name,
2185-
Schema: c.config.schema,
2189+
Schema: c.config.Schema,
21862190
}); err != nil {
21872191
return err
21882192
}
@@ -2211,7 +2215,7 @@ func (c *Client[TTx]) QueueResumeTx(ctx context.Context, tx TTx, name string, op
22112215

22122216
if err := executorTx.QueueResume(ctx, &riverdriver.QueueResumeParams{
22132217
Name: name,
2214-
Schema: c.config.schema,
2218+
Schema: c.config.Schema,
22152219
}); err != nil {
22162220
return err
22172221
}
@@ -2266,6 +2270,7 @@ func (c *Client[TTx]) queueUpdate(ctx context.Context, executorTx riverdriver.Ex
22662270
Metadata: params.Metadata,
22672271
MetadataDoUpdate: updateMetadata,
22682272
Name: name,
2273+
Schema: c.config.Schema,
22692274
})
22702275
if err != nil {
22712276
return nil, err
@@ -2283,7 +2288,7 @@ func (c *Client[TTx]) queueUpdate(ctx context.Context, executorTx riverdriver.Ex
22832288

22842289
if err := executorTx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
22852290
Payload: []string{string(payload)},
2286-
Schema: c.config.schema,
2291+
Schema: c.config.Schema,
22872292
Topic: string(notifier.NotificationTopicControl),
22882293
}); err != nil {
22892294
return nil, err

0 commit comments

Comments
 (0)