Skip to content

Commit 8fc24ba

Browse files
committed
Defer dataclip search_vector indexing off the insert path (#4800)
The dataclips AFTER INSERT trigger built search_vector with jsonb_to_tsvector on the synchronous insert path. For large bodies under load this could hold the connection past the timeout and roll back the insert, so the dataclip was never saved and the run's following events cascade-failed - losing the run. Move vector building off the insert path, mirroring the log_lines approach: - safe_jsonb_to_tsvector(regconfig, jsonb): COALESCE(body,'{}') so a NULL/wiped body yields ''::tsvector (never NULL, so it can't stick in the pending index), catching program_limit_exceeded -> ''::tsvector. - Partial index dataclips_pending_search_idx over (inserted_at) WHERE search_vector IS NULL, built CONCURRENTLY (dataclips is unpartitioned). - Drop the set_search_vector trigger and update_dataclip_search_vector function (down restores the program_limit_exceeded-catching version). - DataclipSearchVectorWorker on a dedicated dataclip_search_indexing queue (concurrency 1) drains pending rows newest-first with FOR UPDATE SKIP LOCKED, snowballing when its per-run budget is exhausted, otherwise minute-ly cron. Uses english_nostop to match the read side (Lightning.Invocation). Dataclip search is now eventually consistent. The insert no longer blocks on or rolls back from vector building.
1 parent 6331486 commit 8fc24ba

8 files changed

Lines changed: 417 additions & 2 deletions

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ and this project adheres to
2121

2222
### Fixed
2323

24+
- Dataclip inserts no longer roll back when building the full-text search vector
25+
is slow. The `jsonb_to_tsvector` work that ran in an `AFTER INSERT` trigger
26+
could hold the connection past the timeout and roll back the insert, losing
27+
the whole run. The search vector is now built off the insert path by a
28+
background `Lightning.Invocation.DataclipSearchVectorWorker` (dedicated
29+
`dataclip_search_indexing` queue), making dataclip search eventually
30+
consistent. [#4800](https://github.com/OpenFn/lightning/issues/4800)
2431
- Channel join crashes when multiple users open the same workflow concurrently
2532
[#4802](https://github.com/OpenFn/lightning/issues/4802)
2633
- Fix `purge_deleted` Oban job crashing when a soft-deleted project has

lib/lightning/config/bootstrap.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ defmodule Lightning.Config.Bootstrap do
270270
args: %{"type" => "monthly_project_digest"}},
271271
# TODO - move this into an ENV?
272272
{"17 */2 * * *", Lightning.Projects, args: %{"type" => "data_retention"}},
273-
{"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker}
273+
{"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker},
274+
{"* * * * *", Lightning.Invocation.DataclipSearchVectorWorker}
274275
]
275276

276277
cleanup_cron =
@@ -300,7 +301,8 @@ defmodule Lightning.Config.Bootstrap do
300301
workflow_failures: 1,
301302
background: 1,
302303
history_exports: 1,
303-
ai_assistant: 10
304+
ai_assistant: 10,
305+
dataclip_search_indexing: 1
304306
]
305307

306308
# https://plausible.io/ is an open-source, privacy-friendly alternative to
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
defmodule Lightning.Invocation.DataclipSearchVectorWorker do
2+
@moduledoc """
3+
Backfills the full-text `search_vector` on `dataclips` rows.
4+
5+
Dataclips are inserted with `search_vector` left `NULL`; the vector is built
6+
here rather than on the insert path. Building it inline was risky:
7+
`jsonb_to_tsvector` over a large dataclip body is slow and runs inside the
8+
transaction that persists the run, so a slow (or failing) vector build could
9+
roll back the dataclip insert and lose the run (#4800). Deferring it keeps
10+
`jsonb_to_tsvector` off that hot path. Search is eventually consistent as a
11+
result, typically catching up within a minute.
12+
13+
Two database objects support this: `safe_jsonb_to_tsvector(regconfig, jsonb)`,
14+
which builds the vector from the dataclip body while tolerating NULL and
15+
oversized input, and a partial index over `search_vector IS NULL`, which keeps
16+
locating pending rows cheap as the table grows. Vectors use the
17+
`english_nostop` config to match the read side (`Lightning.Invocation`), which
18+
queries with `to_tsquery('english_nostop', ...)`.
19+
20+
Each run drains pending rows newest-first, in batches of `@batch_size` up to
21+
`@max_batches` per run. A run that exhausts its budget leaves backlog behind
22+
and enqueues an immediate follow-up ("snowball"); otherwise the minute-ly cron
23+
tick keeps pace. The worker runs on the dedicated `dataclip_search_indexing`
24+
queue at concurrency 1, so only one job executes at a time, and the cron tick
25+
and the snowball carry distinct `trigger` args, so job uniqueness allows one of
26+
each to queue but never a duplicate. The queue is separate from the log_lines
27+
`search_indexing` queue so dataclip indexing can't be starved by a log-line
28+
backlog.
29+
"""
30+
31+
use Oban.Worker,
32+
queue: :dataclip_search_indexing,
33+
priority: 1,
34+
max_attempts: 10,
35+
# Restrict uniqueness to queued states. Oban's defaults also dedup against
36+
# :executing/:completed, so a running snowball would match itself and fail
37+
# to enqueue its successor — breaking the chain after one hop.
38+
unique: [period: 55, keys: [:trigger], states: [:available, :scheduled]]
39+
40+
alias Lightning.Repo
41+
42+
require Logger
43+
44+
@batch_size 2_500
45+
# Per-run budget.
46+
@max_batches 10
47+
48+
@drain_sql """
49+
WITH pending AS (
50+
SELECT id FROM dataclips
51+
WHERE search_vector IS NULL
52+
ORDER BY inserted_at DESC
53+
LIMIT $1 FOR UPDATE SKIP LOCKED
54+
)
55+
UPDATE dataclips d
56+
SET search_vector =
57+
safe_jsonb_to_tsvector('public.english_nostop'::regconfig, d.body)
58+
FROM pending p WHERE d.id = p.id
59+
"""
60+
61+
@impl Oban.Worker
62+
def perform(%Oban.Job{}) do
63+
{filled, budget_exhausted?} = drain(0, 0)
64+
65+
Logger.info(fn ->
66+
# coveralls-ignore-start
67+
"Invocation.DataclipSearchVectorWorker filled #{filled} search_vector row(s)."
68+
# coveralls-ignore-stop
69+
end)
70+
71+
if budget_exhausted? do
72+
# Budget exhausted, so backlog likely remains: enqueue an immediate
73+
# follow-up rather than waiting for the next cron tick.
74+
Oban.insert(Lightning.Oban, __MODULE__.new(%{"trigger" => "snowball"}))
75+
end
76+
77+
{:ok, filled}
78+
end
79+
80+
# Drains up to @max_batches batches, accumulating the number of rows filled.
81+
# Returns {filled, budget_exhausted?}. Stops early when a batch fills fewer
82+
# than @batch_size rows (backlog drained).
83+
defp drain(filled, batches) when batches >= @max_batches do
84+
{filled, true}
85+
end
86+
87+
defp drain(filled, batches) do
88+
%{num_rows: num_rows} = Repo.query!(@drain_sql, [@batch_size])
89+
90+
if num_rows < @batch_size do
91+
{filled + num_rows, false}
92+
else
93+
drain(filled + num_rows, batches + 1)
94+
end
95+
end
96+
end
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
defmodule Lightning.Repo.Migrations.AddSafeJsonbToTsvectorFunction do
2+
use Ecto.Migration
3+
4+
def up do
5+
# Deliberately not STRICT: a STRICT function returns NULL for a NULL doc,
6+
# which would leave search_vector NULL forever and stuck in the pending
7+
# index (e.g. a wiped dataclip with a NULL body). COALESCE instead so the
8+
# result is always a non-NULL tsvector.
9+
execute("""
10+
CREATE OR REPLACE FUNCTION safe_jsonb_to_tsvector(config regconfig, doc jsonb)
11+
RETURNS tsvector LANGUAGE plpgsql IMMUTABLE AS $$
12+
BEGIN
13+
RETURN jsonb_to_tsvector(config, COALESCE(doc, '{}'::jsonb), '"all"');
14+
EXCEPTION WHEN program_limit_exceeded THEN
15+
RETURN ''::tsvector;
16+
END;
17+
$$;
18+
""")
19+
end
20+
21+
def down do
22+
execute("DROP FUNCTION IF EXISTS safe_jsonb_to_tsvector(regconfig, jsonb);")
23+
end
24+
end
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
defmodule Lightning.Repo.Migrations.AddDataclipsPendingSearchIndex do
2+
use Ecto.Migration
3+
4+
@disable_ddl_transaction true
5+
@disable_migration_lock true
6+
7+
def up do
8+
# Partial index that keeps "find pending rows" cheap for the background
9+
# indexing worker. dataclips is an unpartitioned table, so this is a single
10+
# CONCURRENTLY-built index (no per-partition build-and-attach dance). The
11+
# index stays small because existing rows already have a non-NULL
12+
# search_vector, so only freshly-inserted, not-yet-indexed rows match.
13+
execute("""
14+
CREATE INDEX CONCURRENTLY IF NOT EXISTS dataclips_pending_search_idx
15+
ON dataclips (inserted_at)
16+
WHERE search_vector IS NULL
17+
""")
18+
end
19+
20+
def down do
21+
execute("DROP INDEX CONCURRENTLY IF EXISTS dataclips_pending_search_idx")
22+
end
23+
end
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
defmodule Lightning.Repo.Migrations.DropDataclipSearchVectorTrigger do
2+
use Ecto.Migration
3+
4+
def up do
5+
execute("SET lock_timeout = '5s'")
6+
execute("DROP TRIGGER IF EXISTS set_search_vector ON dataclips")
7+
execute("DROP FUNCTION IF EXISTS update_dataclip_search_vector()")
8+
end
9+
10+
def down do
11+
# Restore the program_limit_exceeded-catching version of the function
12+
# (from 20250219122902), not the original naive one.
13+
execute("""
14+
CREATE OR REPLACE FUNCTION update_dataclip_search_vector()
15+
RETURNS trigger
16+
LANGUAGE plpgsql
17+
AS $function$
18+
BEGIN
19+
BEGIN
20+
UPDATE dataclips
21+
SET search_vector = jsonb_to_tsvector('english_nostop', body, '"all"')
22+
WHERE id = NEW.id;
23+
EXCEPTION
24+
WHEN program_limit_exceeded THEN
25+
RAISE NOTICE 'Message too long for tsvector at id: %. Error: %', NEW.id, SQLERRM;
26+
END;
27+
28+
RETURN NEW;
29+
END;
30+
$function$;
31+
""")
32+
33+
execute("""
34+
CREATE TRIGGER set_search_vector
35+
AFTER INSERT ON dataclips FOR EACH ROW
36+
WHEN (NEW."search_vector" IS NULL)
37+
EXECUTE PROCEDURE update_dataclip_search_vector();
38+
""")
39+
end
40+
end

0 commit comments

Comments
 (0)