Skip to content

Commit 03e8a3a

Browse files
committed
Add MySQL support to River
Here, add a new driver `rivermysql` that brings MySQL support to River. Similar to SQLite, it's unfortunately not quite as good as the Postgres driver, but it does the job. MySQL has more facilities than SQLite, but it's still missing some major niceties like `RETURNING`, which for many queries requires us to write an implementation using two operations -- one that performs the action, and then another that loads back the result using returned IDs. It also doesn't have listen/notify. Luckily, _like_ SQLite, any of this nastiness stays cordoned to the driver layer and doesn't leak into the mainline River code. We've got good driver testing and basic tests on clients for each driver in place so we get reasonable assurance that everything works. In the before times, I would've been fairly concerned at the additional maintenance burden that supporting another database would bring, but with the rise of LLMs I think it's more plausible that we can bring something like this in without much trouble. Incredibly, I was able to get almost all of this implemented in just one evening whereby my SQLite driver took me multiple months. I don't want to bring MySQL in as a hard dependency, so I've made the MySQL tests disabled by default. Use `RIVER_MYSQL_TESTS_ENABLED=true` to activate them. Only one CI matrix case runs MySQL tests so that we don't have to repeat them to exhaustion on every version of Go and Postgres. See also: #158
1 parent a4143a0 commit 03e8a3a

48 files changed

Lines changed: 6701 additions & 79 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ jobs:
9191
# latest Postgres version:
9292
- go-version: "1.25"
9393
postgres-version: 18
94+
95+
# Run with MySQL enabled on the latest Go + Postgres to exercise the
96+
# MySQL driver in CI. MySQL tests are opt-in via RIVER_MYSQL_TESTS_ENABLED.
97+
- go-version: "1.26"
98+
postgres-version: 18
99+
mysql-enabled: true
94100
fail-fast: false
95101
timeout-minutes: 5
96102

@@ -128,7 +134,17 @@ jobs:
128134
- name: Set up database
129135
run: psql -c "CREATE DATABASE river_test" $ADMIN_DATABASE_URL
130136

137+
# MySQL is pre-installed on GitHub-hosted Ubuntu runners. Start the
138+
# service and configure passwordless root access for MySQL-enabled runs.
139+
- name: Start MySQL
140+
if: matrix.mysql-enabled
141+
run: |
142+
sudo systemctl start mysql
143+
mysql -u root -proot -e "ALTER USER 'root'@'localhost' IDENTIFIED BY ''; FLUSH PRIVILEGES;"
144+
131145
- name: Test
146+
env:
147+
RIVER_MYSQL_TESTS_ENABLED: ${{ matrix.mysql-enabled && 'true' || '' }}
132148
run: make test/race
133149

134150
cli:

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ generate/migrations: ## Sync changes of pgxv5 migrations to database/sql
2727
.PHONY: generate/sqlc
2828
generate/sqlc: ## Generate sqlc
2929
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc generate
30+
cd riverdriver/rivermysql/internal/dbsqlc && sqlc generate
3031
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc generate
3132
cd riverdriver/riversqlite/internal/dbsqlc && sqlc generate
3233

@@ -101,5 +102,6 @@ verify/migrations: ## Verify synced migrations
101102
.PHONY: verify/sqlc
102103
verify/sqlc: ## Verify generated sqlc
103104
cd riverdriver/riverdatabasesql/internal/dbsqlc && sqlc diff
105+
cd riverdriver/rivermysql/internal/dbsqlc && sqlc diff
104106
cd riverdriver/riverpgxv5/internal/dbsqlc && sqlc diff
105107
cd riverdriver/riversqlite/internal/dbsqlc && sqlc diff

client.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2281,9 +2281,12 @@ type JobListResult struct {
22812281
LastCursor *JobListCursor
22822282
}
22832283

2284-
const databaseNameSQLite = "sqlite"
2284+
const (
2285+
databaseNameMySQL = "mysql"
2286+
databaseNameSQLite = "sqlite"
2287+
)
22852288

2286-
var errJobListParamsMetadataNotSupportedSQLite = errors.New("JobListParams.Metadata is not supported on SQLite")
2289+
var errJobListParamsMetadataNotSupported = errors.New("JobListParams.Metadata is not supported on MySQL or SQLite")
22872290

22882291
// JobList returns a paginated list of jobs matching the provided filters. The
22892292
// provided context is used for the underlying Postgres query and can be used to
@@ -2304,8 +2307,8 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
23042307
}
23052308
params.schema = c.config.Schema
23062309

2307-
if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
2308-
return nil, errJobListParamsMetadataNotSupportedSQLite
2310+
if (c.driver.DatabaseName() == databaseNameMySQL || c.driver.DatabaseName() == databaseNameSQLite) && params.metadataCalled {
2311+
return nil, errJobListParamsMetadataNotSupported
23092312
}
23102313

23112314
dbParams, err := params.toDBParams()
@@ -2345,8 +2348,8 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
23452348
}
23462349
params.schema = c.config.Schema
23472350

2348-
if c.driver.DatabaseName() == databaseNameSQLite && params.metadataCalled {
2349-
return nil, errJobListParamsMetadataNotSupportedSQLite
2351+
if (c.driver.DatabaseName() == databaseNameMySQL || c.driver.DatabaseName() == databaseNameSQLite) && params.metadataCalled {
2352+
return nil, errJobListParamsMetadataNotSupported
23502353
}
23512354

23522355
dbParams, err := params.toDBParams()

go.work

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use (
99
./riverdriver/riverdatabasesql
1010
./riverdriver/riverdrivertest
1111
./riverdriver/riverpgxv5
12+
./riverdriver/rivermysql
1213
./riverdriver/riversqlite
1314
./rivershared
1415
./rivertype
1516
)
17+
18+
replace github.com/riverqueue/river/riverdriver/rivermysql v0.35.0 => ./riverdriver/rivermysql

job_list_params.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,10 @@ func (p *JobListParams) Kinds(kinds ...string) *JobListParams {
347347
//
348348
// https://www.postgresql.org/docs/current/functions-json.html
349349
//
350-
// This function isn't supported in SQLite due to SQLite not having an
351-
// equivalent operator to use, so there's no efficient way to implement it. We
352-
// recommend the use of Where using a condition with a comparison on the `->>`
353-
// operator instead.
350+
// This function isn't supported in MySQL or SQLite due to neither having a
351+
// direct equivalent to Postgres's `@>` operator. We recommend the use of
352+
// [JobListParams.Where] with a database-specific JSON comparison instead (e.g.
353+
// `JSON_EXTRACT` for MySQL, `->>` for SQLite).
354354
func (p *JobListParams) Metadata(json string) *JobListParams {
355355
paramsCopy := p.copy()
356356
paramsCopy.metadataCalled = true

riverdbtest/riverdbtest.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,8 @@ type TestTxOpts struct {
471471
// run using TestSchema. This is meant for environments where parallelism
472472
// doesn't work as well, like SQLite, which will emit "busy" errors when
473473
// multiple clients try to share a schema, even when they're in separate
474-
// transactions.
474+
// transactions. Also applies to MySQL, where InnoDB deadlocks are common
475+
// when multiple transactions are sharing a database.
475476
DisableSchemaSharing bool
476477

477478
// IsTestTxHelper should be set to true for if TestTx is being called from

riverdriver/riverdrivertest/driver_client_test.go

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
_ "github.com/go-sql-driver/mysql"
1011
"github.com/jackc/pgx/v5"
1112
"github.com/jackc/pgx/v5/stdlib"
1213
"github.com/lib/pq"
@@ -18,6 +19,7 @@ import (
1819
"github.com/riverqueue/river/riverdbtest"
1920
"github.com/riverqueue/river/riverdriver"
2021
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
22+
"github.com/riverqueue/river/riverdriver/rivermysql"
2123
"github.com/riverqueue/river/riverdriver/riverpgxv5"
2224
"github.com/riverqueue/river/riverdriver/riversqlite"
2325
"github.com/riverqueue/river/rivershared/riversharedtest"
@@ -109,6 +111,26 @@ func TestClientWithDriverRiverLibSQL(t *testing.T) {
109111
)
110112
}
111113

114+
func TestClientWithDriverRiverMySQL(t *testing.T) {
115+
t.Parallel()
116+
117+
riversharedtest.SkipIfMySQLNotEnabled(t)
118+
119+
var (
120+
ctx = context.Background()
121+
dbPool = riversharedtest.DBPoolMySQL(ctx, t)
122+
driver = rivermysql.New(dbPool)
123+
)
124+
125+
ExerciseClient(ctx, t,
126+
func(ctx context.Context, t *testing.T) (riverdriver.Driver[*sql.Tx], string) {
127+
t.Helper()
128+
129+
return driver, riverdbtest.TestSchema(ctx, t, driver, nil)
130+
},
131+
)
132+
}
133+
112134
func TestClientWithDriverRiverSQLiteModernC(t *testing.T) {
113135
t.Parallel()
114136

@@ -519,9 +541,9 @@ func ExerciseClient[TTx any](ctx context.Context, t *testing.T,
519541
})
520542

521543
listRes, err := client.JobList(ctx, river.NewJobListParams().Metadata(`{"foo":"bar"}`))
522-
if bundle.driver.DatabaseName() == databaseNameSQLite {
523-
t.Logf("Ignoring unsupported JobListResult.Metadata on SQLite")
524-
require.EqualError(t, err, "JobListParams.Metadata is not supported on SQLite")
544+
if bundle.driver.DatabaseName() == databaseNameMySQL || bundle.driver.DatabaseName() == databaseNameSQLite {
545+
t.Logf("Ignoring unsupported JobListResult.Metadata on %s", bundle.driver.DatabaseName())
546+
require.EqualError(t, err, "JobListParams.Metadata is not supported on MySQL or SQLite")
525547
return
526548
}
527549
require.NoError(t, err)
@@ -583,9 +605,9 @@ func ExerciseClient[TTx any](ctx context.Context, t *testing.T,
583605
})
584606

585607
listRes, err := client.JobListTx(ctx, tx, river.NewJobListParams().Metadata(`{"foo":"bar"}`))
586-
if bundle.driver.DatabaseName() == databaseNameSQLite {
587-
t.Logf("Ignoring unsupported JobListTxResult.Metadata on SQLite")
588-
require.EqualError(t, err, "JobListParams.Metadata is not supported on SQLite")
608+
if bundle.driver.DatabaseName() == databaseNameMySQL || bundle.driver.DatabaseName() == databaseNameSQLite {
609+
t.Logf("Ignoring unsupported JobListTxResult.Metadata on %s", bundle.driver.DatabaseName())
610+
require.EqualError(t, err, "JobListParams.Metadata is not supported on MySQL or SQLite")
589611
return
590612
}
591613
require.NoError(t, err)
@@ -607,9 +629,12 @@ func ExerciseClient[TTx any](ctx context.Context, t *testing.T,
607629

608630
listParams := river.NewJobListParams()
609631

610-
if bundle.driver.DatabaseName() == databaseNameSQLite {
632+
switch bundle.driver.DatabaseName() {
633+
case databaseNameSQLite:
611634
listParams = listParams.Where("metadata ->> @json_path = @json_val", river.NamedArgs{"json_path": "$.foo", "json_val": "bar"})
612-
} else {
635+
case databaseNameMySQL:
636+
listParams = listParams.Where("JSON_UNQUOTE(JSON_EXTRACT(metadata, @json_path)) = @json_val", river.NamedArgs{"json_path": "$.foo", "json_val": "bar"})
637+
default:
613638
// "bar" is quoted in this branch because `jsonb_path_query_first` needs to be compared to a JSON value
614639
listParams = listParams.Where("jsonb_path_query_first(metadata, @json_path) = @json_val", river.NamedArgs{"json_path": "$.foo", "json_val": `"bar"`})
615640
}

riverdriver/riverdrivertest/driver_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99
"time"
1010

11+
_ "github.com/go-sql-driver/mysql"
1112
"github.com/jackc/pgx/v5"
1213
"github.com/jackc/pgx/v5/pgxpool"
1314
"github.com/jackc/pgx/v5/stdlib"
@@ -21,6 +22,7 @@ import (
2122
"github.com/riverqueue/river/riverdriver"
2223
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
2324
"github.com/riverqueue/river/riverdriver/riverdrivertest"
25+
"github.com/riverqueue/river/riverdriver/rivermysql"
2426
"github.com/riverqueue/river/riverdriver/riverpgxv5"
2527
"github.com/riverqueue/river/riverdriver/riversqlite"
2628
"github.com/riverqueue/river/rivershared/riversharedtest"
@@ -203,6 +205,43 @@ func TestDriverRiverLiteLibSQL(t *testing.T) { //nolint:dupl
203205
})
204206
}
205207

208+
func TestDriverRiverMySQL(t *testing.T) {
209+
t.Parallel()
210+
211+
riversharedtest.SkipIfMySQLNotEnabled(t)
212+
213+
var (
214+
ctx = context.Background()
215+
dbPool = riversharedtest.DBPoolMySQL(ctx, t)
216+
driver = rivermysql.New(dbPool)
217+
)
218+
219+
riverdrivertest.Exercise(ctx, t,
220+
func(ctx context.Context, t *testing.T, opts *riverdbtest.TestSchemaOpts) (riverdriver.Driver[*sql.Tx], string) {
221+
t.Helper()
222+
223+
return driver, riverdbtest.TestSchema(ctx, t, driver, opts)
224+
},
225+
func(ctx context.Context, t *testing.T) (riverdriver.Executor, riverdriver.Driver[*sql.Tx]) {
226+
t.Helper()
227+
228+
tx, schema := riverdbtest.TestTx(ctx, t, driver, &riverdbtest.TestTxOpts{
229+
// Disable schema sharing to reduce InnoDB deadlocks from
230+
// parallel tests contending on the same database.
231+
DisableSchemaSharing: true,
232+
})
233+
234+
// MySQL has no search_path equivalent, so USE the test schema
235+
// database so that unqualified queries resolve correctly.
236+
if schema != "" {
237+
_, err := tx.ExecContext(ctx, "USE "+schema)
238+
require.NoError(t, err)
239+
}
240+
241+
return driver.UnwrapExecutor(tx), driver
242+
})
243+
}
244+
206245
func TestDriverRiverSQLiteModernC(t *testing.T) { //nolint:dupl
207246
t.Parallel()
208247

riverdriver/riverdrivertest/executor_tx.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,13 @@ func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T,
157157

158158
exec := setup(ctx, t)
159159

160-
require.NoError(t, exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar"))
160+
_, driver := executorWithTx(ctx, t)
161+
switch driver.DatabaseName() {
162+
case databaseNameMySQL:
163+
require.NoError(t, exec.Exec(ctx, "SELECT CONCAT(?, ?)", "foo", "bar"))
164+
default:
165+
require.NoError(t, exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar"))
166+
}
161167
})
162168
})
163169

@@ -166,9 +172,8 @@ func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T,
166172

167173
{
168174
driver, _ := driverWithSchema(ctx, t, nil)
169-
if driver.DatabaseName() == databaseNameSQLite {
170-
t.Logf("Skipping PGAdvisoryXactLock test for SQLite")
171-
return
175+
if driver.DatabaseName() == databaseNameSQLite || driver.DatabaseName() == databaseNameMySQL {
176+
t.Skipf("Skipping PGAdvisoryXactLock test for %s", driver.DatabaseName())
172177
}
173178
}
174179

riverdriver/riverdrivertest/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ require (
1111
github.com/lib/pq v1.12.3
1212
github.com/riverqueue/river v0.35.0
1313
github.com/riverqueue/river/riverdriver v0.35.0
14+
github.com/go-sql-driver/mysql v1.9.3
1415
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.35.0
16+
github.com/riverqueue/river/riverdriver/rivermysql v0.35.0
1517
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.35.0
1618
github.com/riverqueue/river/riverdriver/riversqlite v0.35.0
1719
github.com/riverqueue/river/rivershared v0.35.0
@@ -27,6 +29,7 @@ require (
2729
require (
2830
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
2931
github.com/coder/websocket v1.8.12 // indirect
32+
filippo.io/edwards25519 v1.1.0 // indirect
3033
github.com/dustin/go-humanize v1.0.1 // indirect
3134
github.com/google/uuid v1.6.0 // indirect
3235
github.com/jackc/pgpassfile v1.0.0 // indirect

0 commit comments

Comments
 (0)