Skip to content

Commit 87c0c50

Browse files
authored
Mark schema template replacements as Stable to enable Replacer cache (#1242)
* Mark schema template replacements as Stable to enable Replacer cache schemaTemplateParam (and the schema replacement in ColumnExists) never set Stable: true, so the sqlctemplate.Replacer cache introduced in #802 is bypassed on every query. This causes regexp.ReplaceAllStringFunc to run on every SQL query, allocating a new string each time. Under production throughput this creates enough allocation pressure to cause visible memory growth — we observed +29MB in 16 minutes on a worker running river.Start() with continuous job polling. The schema value comes from the client config and is constant for the lifetime of a River client, so marking it Stable is safe. With this change, the regex runs once per unique SQL string and then serves from cache. Fixes all three drivers: riverpgxv5, riverdatabasesql, riversqlite. Fixes #1241 * Add entry to changelog * revert Stable: true in ColumExists in river_pgx_v5_driver
1 parent d8c3909 commit 87c0c50

7 files changed

Lines changed: 172 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Fixed
1111

12+
- Mark schema replacements as `Stable` in sqlc templates, preventing query SQL from having to be reallocated over and over again.. [PR #1242](https://github.com/riverqueue/river/pull/1242).
1213
- Fix unsafe concurrent producer map access in client. [PR #1236](https://github.com/riverqueue/river/pull/1236).
1314
- Fix bug in `sqltemplate` cached path in order in which named args are passed to a query (previously, the order was unstable). [PR #1243](https://github.com/riverqueue/river/pull/1243).
1415

riverdriver/riverdatabasesql/river_database_sql_driver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (e *Executor) ColumnExists(ctx context.Context, params *riverdriver.ColumnE
126126
schema = "'" + params.Schema + "'"
127127
}
128128
ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{
129-
"schema": {Value: schema},
129+
"schema": {Value: schema, Stable: true},
130130
}, nil)
131131

132132
exists, err := dbsqlc.New().ColumnExists(ctx, e.dbtx, &dbsqlc.ColumnExistsParams{
@@ -1247,6 +1247,6 @@ func schemaTemplateParam(ctx context.Context, schema string) context.Context {
12471247
}
12481248

12491249
return sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{
1250-
"schema": {Value: schema},
1250+
"schema": {Value: schema, Stable: true},
12511251
}, nil)
12521252
}

riverdriver/riverdatabasesql/river_database_sql_driver_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,59 @@ func TestSchemaTemplateParam(t *testing.T) {
115115
require.NoError(t, err)
116116
require.Equal(t, `SELECT 1 FROM "custom_schema".river_job`, updatedSQL)
117117
})
118+
119+
t.Run("SchemaReplacementIsStable", func(t *testing.T) {
120+
t.Parallel()
121+
122+
replacer, bundle := setup(t)
123+
124+
const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job"
125+
126+
updatedSQL1, _, err := replacer.RunSafely(
127+
schemaTemplateParam(ctx, "my_schema"),
128+
bundle.driver.ArgPlaceholder(),
129+
sql,
130+
nil,
131+
)
132+
require.NoError(t, err)
133+
require.Equal(t, `SELECT 1 FROM "my_schema".river_job`, updatedSQL1)
134+
135+
// Second call with same SQL + same schema produces identical result.
136+
// Because schema is marked Stable, the Replacer caches the output
137+
// after the first call and short-circuits regex on subsequent calls.
138+
updatedSQL2, _, err := replacer.RunSafely(
139+
schemaTemplateParam(ctx, "my_schema"),
140+
bundle.driver.ArgPlaceholder(),
141+
sql,
142+
nil,
143+
)
144+
require.NoError(t, err)
145+
require.Equal(t, updatedSQL1, updatedSQL2)
146+
})
147+
148+
t.Run("EmptySchemaReplacementIsStable", func(t *testing.T) {
149+
t.Parallel()
150+
151+
replacer, bundle := setup(t)
152+
153+
const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job"
154+
155+
updatedSQL1, _, err := replacer.RunSafely(
156+
schemaTemplateParam(ctx, ""),
157+
bundle.driver.ArgPlaceholder(),
158+
sql,
159+
nil,
160+
)
161+
require.NoError(t, err)
162+
require.Equal(t, "SELECT 1 FROM river_job", updatedSQL1)
163+
164+
updatedSQL2, _, err := replacer.RunSafely(
165+
schemaTemplateParam(ctx, ""),
166+
bundle.driver.ArgPlaceholder(),
167+
sql,
168+
nil,
169+
)
170+
require.NoError(t, err)
171+
require.Equal(t, updatedSQL1, updatedSQL2)
172+
})
118173
}

riverdriver/riverpgxv5/river_pgx_v5_driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,6 @@ func schemaTemplateParam(ctx context.Context, schema string) context.Context {
13101310
}
13111311

13121312
return sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{
1313-
"schema": {Value: schema},
1313+
"schema": {Value: schema, Stable: true},
13141314
}, nil)
13151315
}

riverdriver/riverpgxv5/river_pgx_v5_driver_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,63 @@ func TestSchemaTemplateParam(t *testing.T) {
238238
require.NoError(t, err)
239239
require.Equal(t, `SELECT 1 FROM "custom_schema".river_job`, updatedSQL)
240240
})
241+
242+
t.Run("SchemaReplacementIsStable", func(t *testing.T) {
243+
t.Parallel()
244+
245+
replacer, bundle := setup(t)
246+
247+
const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job"
248+
249+
// First call
250+
updatedSQL1, _, err := replacer.RunSafely(
251+
schemaTemplateParam(ctx, "my_schema"),
252+
bundle.driver.ArgPlaceholder(),
253+
sql,
254+
nil,
255+
)
256+
require.NoError(t, err)
257+
require.Equal(t, `SELECT 1 FROM "my_schema".river_job`, updatedSQL1)
258+
259+
// Second call with same SQL + same schema produces identical result.
260+
// Because schema is marked Stable, the Replacer caches the output
261+
// after the first call and short-circuits regex on subsequent calls.
262+
updatedSQL2, _, err := replacer.RunSafely(
263+
schemaTemplateParam(ctx, "my_schema"),
264+
bundle.driver.ArgPlaceholder(),
265+
sql,
266+
nil,
267+
)
268+
require.NoError(t, err)
269+
require.Equal(t, updatedSQL1, updatedSQL2)
270+
})
271+
272+
t.Run("EmptySchemaReplacementIsStable", func(t *testing.T) {
273+
t.Parallel()
274+
275+
replacer, bundle := setup(t)
276+
277+
const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job"
278+
279+
updatedSQL1, _, err := replacer.RunSafely(
280+
schemaTemplateParam(ctx, ""),
281+
bundle.driver.ArgPlaceholder(),
282+
sql,
283+
nil,
284+
)
285+
require.NoError(t, err)
286+
require.Equal(t, "SELECT 1 FROM river_job", updatedSQL1)
287+
288+
// Repeat — same result from cache
289+
updatedSQL2, _, err := replacer.RunSafely(
290+
schemaTemplateParam(ctx, ""),
291+
bundle.driver.ArgPlaceholder(),
292+
sql,
293+
nil,
294+
)
295+
require.NoError(t, err)
296+
require.Equal(t, updatedSQL1, updatedSQL2)
297+
})
241298
}
242299

243300
type nilConnDBTX struct{}

riverdriver/riversqlite/river_sqlite_driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1680,7 +1680,7 @@ func schemaTemplateParam(ctx context.Context, schema string) context.Context {
16801680
}
16811681

16821682
return sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{
1683-
"schema": {Value: schema},
1683+
"schema": {Value: schema, Stable: true},
16841684
}, nil)
16851685
}
16861686

riverdriver/riversqlite/river_sqlite_driver_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,59 @@ func TestSchemaTemplateParam(t *testing.T) {
9494
require.NoError(t, err)
9595
require.Equal(t, `SELECT 1 FROM "custom_schema".river_job`, updatedSQL)
9696
})
97+
98+
t.Run("SchemaReplacementIsStable", func(t *testing.T) {
99+
t.Parallel()
100+
101+
replacer, bundle := setup(t)
102+
103+
const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job"
104+
105+
updatedSQL1, _, err := replacer.RunSafely(
106+
schemaTemplateParam(ctx, "my_schema"),
107+
bundle.driver.ArgPlaceholder(),
108+
sql,
109+
nil,
110+
)
111+
require.NoError(t, err)
112+
require.Equal(t, `SELECT 1 FROM "my_schema".river_job`, updatedSQL1)
113+
114+
// Second call with same SQL + same schema produces identical result.
115+
// Because schema is marked Stable, the Replacer caches the output
116+
// after the first call and short-circuits regex on subsequent calls.
117+
updatedSQL2, _, err := replacer.RunSafely(
118+
schemaTemplateParam(ctx, "my_schema"),
119+
bundle.driver.ArgPlaceholder(),
120+
sql,
121+
nil,
122+
)
123+
require.NoError(t, err)
124+
require.Equal(t, updatedSQL1, updatedSQL2)
125+
})
126+
127+
t.Run("EmptySchemaReplacementIsStable", func(t *testing.T) {
128+
t.Parallel()
129+
130+
replacer, bundle := setup(t)
131+
132+
const sql = "SELECT 1 FROM /* TEMPLATE: schema */river_job"
133+
134+
updatedSQL1, _, err := replacer.RunSafely(
135+
schemaTemplateParam(ctx, ""),
136+
bundle.driver.ArgPlaceholder(),
137+
sql,
138+
nil,
139+
)
140+
require.NoError(t, err)
141+
require.Equal(t, "SELECT 1 FROM river_job", updatedSQL1)
142+
143+
updatedSQL2, _, err := replacer.RunSafely(
144+
schemaTemplateParam(ctx, ""),
145+
bundle.driver.ArgPlaceholder(),
146+
sql,
147+
nil,
148+
)
149+
require.NoError(t, err)
150+
require.Equal(t, updatedSQL1, updatedSQL2)
151+
})
97152
}

0 commit comments

Comments
 (0)