Skip to content

Commit 686f5b6

Browse files
authored
split riverdrivertest.go (#1138)
Instead of a giant 5000 line file, split into smaller files grouped by entity. For job queries where we have many tests, break them into files by query type.
1 parent 4fdba79 commit 686f5b6

13 files changed

Lines changed: 5149 additions & 4808 deletions

File tree

internal/jobexecutor/job_executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,7 @@ func TestJobExecutor_Execute(t *testing.T) {
858858
require.Equal(t, rivertype.JobStateRetryable, job.State)
859859
require.Len(t, job.Errors, 1)
860860
// Sufficient enough to ensure that the stack trace is included:
861-
require.Contains(t, job.Errors[0].Trace, "river/internal/jobexecutor/job_executor.go")
861+
require.Contains(t, job.Errors[0].Trace, "internal/jobexecutor/job_executor.go")
862862
})
863863

864864
t.Run("PanicAgainAfterRetry", func(t *testing.T) {
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
package riverdrivertest
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/riverqueue/river/riverdbtest"
10+
"github.com/riverqueue/river/riverdriver"
11+
"github.com/riverqueue/river/rivershared/testfactory"
12+
"github.com/riverqueue/river/rivershared/util/hashutil"
13+
"github.com/riverqueue/river/rivershared/util/randutil"
14+
"github.com/riverqueue/river/rivertype"
15+
)
16+
17+
func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T,
18+
driverWithSchema func(ctx context.Context, t *testing.T, opts *riverdbtest.TestSchemaOpts) (riverdriver.Driver[TTx], string),
19+
executorWithTx func(ctx context.Context, t *testing.T) (riverdriver.Executor, riverdriver.Driver[TTx]),
20+
) {
21+
t.Helper()
22+
23+
setup := func(ctx context.Context, t *testing.T) riverdriver.Executor {
24+
t.Helper()
25+
26+
exec, _ := executorWithTx(ctx, t)
27+
return exec
28+
}
29+
30+
t.Run("Begin", func(t *testing.T) {
31+
t.Parallel()
32+
33+
t.Run("BasicVisibility", func(t *testing.T) {
34+
t.Parallel()
35+
36+
exec := setup(ctx, t)
37+
38+
tx, err := exec.Begin(ctx)
39+
require.NoError(t, err)
40+
t.Cleanup(func() { _ = tx.Rollback(ctx) })
41+
42+
// Job visible in subtransaction, but not parent.
43+
{
44+
job := testfactory.Job(ctx, t, tx, &testfactory.JobOpts{})
45+
_ = testfactory.Job(ctx, t, tx, &testfactory.JobOpts{})
46+
47+
_, err := tx.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID})
48+
require.NoError(t, err)
49+
50+
require.NoError(t, tx.Rollback(ctx))
51+
52+
_, err = exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID})
53+
require.ErrorIs(t, err, rivertype.ErrNotFound)
54+
}
55+
})
56+
57+
t.Run("NestedTransactions", func(t *testing.T) {
58+
t.Parallel()
59+
60+
exec := setup(ctx, t)
61+
62+
tx1, err := exec.Begin(ctx)
63+
require.NoError(t, err)
64+
t.Cleanup(func() { _ = tx1.Rollback(ctx) })
65+
66+
// Job visible in tx1, but not top level executor.
67+
{
68+
job1 := testfactory.Job(ctx, t, tx1, &testfactory.JobOpts{})
69+
70+
{
71+
tx2, err := tx1.Begin(ctx)
72+
require.NoError(t, err)
73+
t.Cleanup(func() { _ = tx2.Rollback(ctx) })
74+
75+
// Job visible in tx2, but not top level executor.
76+
{
77+
job2 := testfactory.Job(ctx, t, tx2, &testfactory.JobOpts{})
78+
79+
_, err := tx2.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job2.ID})
80+
require.NoError(t, err)
81+
82+
require.NoError(t, tx2.Rollback(ctx))
83+
84+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job2.ID})
85+
require.ErrorIs(t, err, rivertype.ErrNotFound)
86+
}
87+
88+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job1.ID})
89+
require.NoError(t, err)
90+
}
91+
92+
// Repeat the same subtransaction again.
93+
{
94+
tx2, err := tx1.Begin(ctx)
95+
require.NoError(t, err)
96+
t.Cleanup(func() { _ = tx2.Rollback(ctx) })
97+
98+
job2 := testfactory.Job(ctx, t, tx2, &testfactory.JobOpts{})
99+
100+
_, err = tx2.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job2.ID})
101+
require.NoError(t, err)
102+
103+
require.NoError(t, tx2.Rollback(ctx))
104+
105+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job2.ID})
106+
require.ErrorIs(t, err, rivertype.ErrNotFound)
107+
}
108+
109+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job1.ID})
110+
require.NoError(t, err)
111+
112+
require.NoError(t, tx1.Rollback(ctx))
113+
114+
_, err = exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job1.ID})
115+
require.ErrorIs(t, err, rivertype.ErrNotFound)
116+
}
117+
})
118+
119+
t.Run("RollbackAfterCommit", func(t *testing.T) {
120+
t.Parallel()
121+
122+
exec := setup(ctx, t)
123+
124+
tx1, err := exec.Begin(ctx)
125+
require.NoError(t, err)
126+
t.Cleanup(func() { _ = tx1.Rollback(ctx) })
127+
128+
tx2, err := tx1.Begin(ctx)
129+
require.NoError(t, err)
130+
t.Cleanup(func() { _ = tx2.Rollback(ctx) })
131+
132+
job := testfactory.Job(ctx, t, tx2, &testfactory.JobOpts{})
133+
134+
require.NoError(t, tx2.Commit(ctx))
135+
_ = tx2.Rollback(ctx) // "tx is closed" error generally returned, but don't require this
136+
137+
// Despite rollback being called after commit, the job is still
138+
// visible from the outer transaction.
139+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID})
140+
require.NoError(t, err)
141+
})
142+
})
143+
144+
t.Run("Exec", func(t *testing.T) {
145+
t.Parallel()
146+
147+
t.Run("NoArgs", func(t *testing.T) {
148+
t.Parallel()
149+
150+
exec := setup(ctx, t)
151+
152+
require.NoError(t, exec.Exec(ctx, "SELECT 1 + 2"))
153+
})
154+
155+
t.Run("WithArgs", func(t *testing.T) {
156+
t.Parallel()
157+
158+
exec := setup(ctx, t)
159+
160+
require.NoError(t, exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar"))
161+
})
162+
})
163+
164+
t.Run("PGAdvisoryXactLock", func(t *testing.T) {
165+
t.Parallel()
166+
167+
{
168+
driver, _ := driverWithSchema(ctx, t, nil)
169+
if driver.DatabaseName() == databaseNameSQLite {
170+
t.Logf("Skipping PGAdvisoryXactLock test for SQLite")
171+
return
172+
}
173+
}
174+
175+
exec := setup(ctx, t)
176+
177+
// It's possible for multiple versions of this test to be running at the
178+
// same time (from different drivers), so make sure the lock we're
179+
// acquiring per test is unique by using the complete test name. Also
180+
// add randomness in case a test is run multiple times with `-count`.
181+
lockHash := hashutil.NewAdvisoryLockHash(0)
182+
lockHash.Write([]byte(t.Name()))
183+
lockHash.Write([]byte(randutil.Hex(10)))
184+
key := lockHash.Key()
185+
186+
// Tries to acquire the given lock from another test transaction and
187+
// returns true if the lock was acquired.
188+
tryAcquireLock := func(exec riverdriver.Executor) bool {
189+
var lockAcquired bool
190+
require.NoError(t, exec.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", key).Scan(&lockAcquired))
191+
return lockAcquired
192+
}
193+
194+
// Start a transaction to acquire the lock so we can later release the
195+
// lock by rolling back.
196+
execTx, err := exec.Begin(ctx)
197+
require.NoError(t, err)
198+
199+
// Acquire the advisory lock on the main test transaction.
200+
_, err = execTx.PGAdvisoryXactLock(ctx, key)
201+
require.NoError(t, err)
202+
203+
// Start another test transaction unrelated to the first.
204+
otherExec, _ := executorWithTx(ctx, t)
205+
206+
// The other test transaction is unable to acquire the lock because the
207+
// first test transaction holds it.
208+
require.False(t, tryAcquireLock(otherExec))
209+
210+
// Roll back the first test transaction to release the lock.
211+
require.NoError(t, execTx.Rollback(ctx))
212+
213+
// The other test transaction can now acquire the lock.
214+
require.True(t, tryAcquireLock(otherExec))
215+
})
216+
217+
t.Run("QueryRow", func(t *testing.T) {
218+
t.Parallel()
219+
220+
exec := setup(ctx, t)
221+
222+
var (
223+
field1 int
224+
field2 int
225+
field3 int
226+
fieldFoo string
227+
)
228+
229+
err := exec.QueryRow(ctx, "SELECT 1, 2, 3, 'foo'").Scan(&field1, &field2, &field3, &fieldFoo)
230+
require.NoError(t, err)
231+
232+
require.Equal(t, 1, field1)
233+
require.Equal(t, 2, field2)
234+
require.Equal(t, 3, field3)
235+
require.Equal(t, "foo", fieldFoo)
236+
})
237+
}

0 commit comments

Comments
 (0)