Skip to content

Commit fc2d195

Browse files
authored
Use json_each to make SQLite single inserts batch inserts (#1276)
Back when I was first implementing SQLite, the sqlc support was so catastrophically bad that many, many basic utilities just could not be used in queries. I tried like a half dozen different ways, but couldn't find anything that was both supported by SQLite and sqlc, so I ended up making a number of batch inserts into single inserts that get looped over in the driver. This isn't as bad in SQLite because usually your inserting to a local database (i.e. no network connections), but it's still not ideal in that it's probably somewhat slower, and deviates from the Postgres implementation. Sqlc's made some SQLite improvements since then, and I found that I'm now able to get a batch insert working `json_each` and `json_extract`. Here, go through the existing SQLite driver function and update things to use batch inserts wherever possible. Fixed operations: * `JobInsertFastMany` * `JobInsertFastManyNoReturning` * `JobInsertFullMany` * `MigrationInsertMany` * `MigrationInsertManyAssumingMain`
1 parent d6ac75a commit fc2d195

6 files changed

Lines changed: 593 additions & 286 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Changed
11+
12+
- Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276)
13+
1014
### Fixed
1115

1216
- Fix `JobCancel` having no effect on running jobs when using a poll-only driver (e.g. `riverdatabasesql`). The `controlActionCancel` event was silently dropped in `fetchAndRunLoop`'s `queueControlCh` handler instead of being forwarded to `maybeCancelJob`. Note: this fix only works within a single process; cross-process cancels in poll-only setups must wait for the next poll cycle. [PR #1245](https://github.com/riverqueue/river/pull/1245).

riverdriver/riversqlite/internal/dbsqlc/river_job.sql

Lines changed: 138 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,6 @@ WHERE state = 'running'
189189
ORDER BY id
190190
LIMIT @max;
191191

192-
-- Insert a job.
193-
--
194-
-- This is supposed to be a batch insert, but various limitations of the
195-
-- combined SQLite + sqlc has left me unable to find a way of injecting many
196-
-- arguments en masse (like how we slightly abuse arrays to pull it off for the
197-
-- Postgres drivers), so we loop over many insert operations instead, with the
198-
-- expectation that this may be fixable in the future. Because SQLite targets
199-
-- will often be local and therefore with a very minimal round trip compared to
200-
-- a network, looping over operations is probably okay performance-wise.
201192
-- name: JobInsertFast :one
202193
INSERT INTO /* TEMPLATE: schema */river_job(
203194
id,
@@ -246,6 +237,56 @@ ON CONFLICT (unique_key)
246237
DO UPDATE SET kind = EXCLUDED.kind
247238
RETURNING *;
248239

240+
-- name: JobInsertFastMany :many
241+
INSERT INTO /* TEMPLATE: schema */river_job(
242+
id,
243+
args,
244+
created_at,
245+
kind,
246+
max_attempts,
247+
metadata,
248+
priority,
249+
queue,
250+
scheduled_at,
251+
state,
252+
tags,
253+
unique_key,
254+
unique_states
255+
)
256+
SELECT
257+
cast(json_extract(value, '$.id') AS integer),
258+
json(cast(json_extract(value, '$.args') AS blob)),
259+
coalesce(cast(json_extract(value, '$.created_at') AS text), datetime('now', 'subsec')),
260+
cast(json_extract(value, '$.kind') AS text),
261+
cast(json_extract(value, '$.max_attempts') AS integer),
262+
json(cast(json_extract(value, '$.metadata') AS blob)),
263+
cast(json_extract(value, '$.priority') AS integer),
264+
cast(json_extract(value, '$.queue') AS text),
265+
coalesce(cast(json_extract(value, '$.scheduled_at') AS text), datetime('now', 'subsec')),
266+
cast(json_extract(value, '$.state') AS text),
267+
json(cast(json_extract(value, '$.tags') AS blob)),
268+
CASE WHEN length(cast(json_extract(value, '$.unique_key') AS text)) = 0 THEN NULL ELSE unhex(cast(json_extract(value, '$.unique_key') AS text)) END,
269+
nullif(cast(json_extract(value, '$.unique_states') AS integer), 0)
270+
FROM json_each(cast(@jobs AS blob))
271+
WHERE true
272+
ON CONFLICT (unique_key)
273+
WHERE unique_key IS NOT NULL
274+
AND unique_states IS NOT NULL
275+
AND CASE state
276+
WHEN 'available' THEN unique_states & (1 << 0)
277+
WHEN 'cancelled' THEN unique_states & (1 << 1)
278+
WHEN 'completed' THEN unique_states & (1 << 2)
279+
WHEN 'discarded' THEN unique_states & (1 << 3)
280+
WHEN 'pending' THEN unique_states & (1 << 4)
281+
WHEN 'retryable' THEN unique_states & (1 << 5)
282+
WHEN 'running' THEN unique_states & (1 << 6)
283+
WHEN 'scheduled' THEN unique_states & (1 << 7)
284+
ELSE 0
285+
END >= 1
286+
-- Something needs to be updated for a row to be returned on a conflict.
287+
DO UPDATE SET kind = EXCLUDED.kind
288+
RETURNING *;
289+
249290
-- name: JobInsertFastNoReturning :execrows
250291
INSERT INTO /* TEMPLATE: schema */river_job(
251292
args,
@@ -290,6 +331,52 @@ ON CONFLICT (unique_key)
290331
END >= 1
291332
DO NOTHING;
292333

334+
-- name: JobInsertFastManyNoReturning :execrows
335+
INSERT INTO /* TEMPLATE: schema */river_job(
336+
args,
337+
created_at,
338+
kind,
339+
max_attempts,
340+
metadata,
341+
priority,
342+
queue,
343+
scheduled_at,
344+
state,
345+
tags,
346+
unique_key,
347+
unique_states
348+
)
349+
SELECT
350+
json(cast(json_extract(value, '$.args') AS blob)),
351+
coalesce(cast(json_extract(value, '$.created_at') AS text), datetime('now', 'subsec')),
352+
cast(json_extract(value, '$.kind') AS text),
353+
cast(json_extract(value, '$.max_attempts') AS integer),
354+
json(cast(json_extract(value, '$.metadata') AS blob)),
355+
cast(json_extract(value, '$.priority') AS integer),
356+
cast(json_extract(value, '$.queue') AS text),
357+
coalesce(cast(json_extract(value, '$.scheduled_at') AS text), datetime('now', 'subsec')),
358+
cast(json_extract(value, '$.state') AS text),
359+
json(cast(json_extract(value, '$.tags') AS blob)),
360+
CASE WHEN length(cast(json_extract(value, '$.unique_key') AS text)) = 0 THEN NULL ELSE unhex(cast(json_extract(value, '$.unique_key') AS text)) END,
361+
nullif(cast(json_extract(value, '$.unique_states') AS integer), 0)
362+
FROM json_each(cast(@jobs AS blob))
363+
WHERE true
364+
ON CONFLICT (unique_key)
365+
WHERE unique_key IS NOT NULL
366+
AND unique_states IS NOT NULL
367+
AND CASE state
368+
WHEN 'available' THEN unique_states & (1 << 0)
369+
WHEN 'cancelled' THEN unique_states & (1 << 1)
370+
WHEN 'completed' THEN unique_states & (1 << 2)
371+
WHEN 'discarded' THEN unique_states & (1 << 3)
372+
WHEN 'pending' THEN unique_states & (1 << 4)
373+
WHEN 'retryable' THEN unique_states & (1 << 5)
374+
WHEN 'running' THEN unique_states & (1 << 6)
375+
WHEN 'scheduled' THEN unique_states & (1 << 7)
376+
ELSE 0
377+
END >= 1
378+
DO NOTHING;
379+
293380
-- name: JobInsertFull :one
294381
INSERT INTO /* TEMPLATE: schema */river_job(
295382
args,
@@ -329,6 +416,47 @@ INSERT INTO /* TEMPLATE: schema */river_job(
329416
@unique_states
330417
) RETURNING *;
331418

419+
-- name: JobInsertFullMany :many
420+
INSERT INTO /* TEMPLATE: schema */river_job(
421+
args,
422+
attempt,
423+
attempted_at,
424+
attempted_by,
425+
created_at,
426+
errors,
427+
finalized_at,
428+
kind,
429+
max_attempts,
430+
metadata,
431+
priority,
432+
queue,
433+
scheduled_at,
434+
state,
435+
tags,
436+
unique_key,
437+
unique_states
438+
)
439+
SELECT
440+
json(cast(json_extract(value, '$.args') AS blob)),
441+
cast(json_extract(value, '$.attempt') AS integer),
442+
cast(json_extract(value, '$.attempted_at') AS text),
443+
CASE WHEN json_type(value, '$.attempted_by') IS NULL THEN NULL ELSE json(cast(json_extract(value, '$.attempted_by') AS blob)) END,
444+
coalesce(cast(json_extract(value, '$.created_at') AS text), datetime('now', 'subsec')),
445+
CASE WHEN json_type(value, '$.errors') IS NULL THEN NULL ELSE json(cast(json_extract(value, '$.errors') AS blob)) END,
446+
cast(json_extract(value, '$.finalized_at') AS text),
447+
cast(json_extract(value, '$.kind') AS text),
448+
cast(json_extract(value, '$.max_attempts') AS integer),
449+
json(cast(json_extract(value, '$.metadata') AS blob)),
450+
cast(json_extract(value, '$.priority') AS integer),
451+
cast(json_extract(value, '$.queue') AS text),
452+
coalesce(cast(json_extract(value, '$.scheduled_at') AS text), datetime('now', 'subsec')),
453+
cast(json_extract(value, '$.state') AS text),
454+
json(cast(json_extract(value, '$.tags') AS blob)),
455+
CASE WHEN length(cast(json_extract(value, '$.unique_key') AS text)) = 0 THEN NULL ELSE unhex(cast(json_extract(value, '$.unique_key') AS text)) END,
456+
nullif(cast(json_extract(value, '$.unique_states') AS integer), 0)
457+
FROM json_each(cast(@jobs AS blob))
458+
RETURNING *;
459+
332460
-- name: JobKindList :many
333461
SELECT DISTINCT kind
334462
FROM /* TEMPLATE: schema */river_job
@@ -513,4 +641,4 @@ SET
513641
metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN json(cast(@metadata AS blob)) ELSE metadata END,
514642
state = CASE WHEN cast(@state_do_update AS boolean) THEN @state ELSE state END
515643
WHERE id = @id
516-
RETURNING *;
644+
RETURNING *;

0 commit comments

Comments
 (0)