From 3b3116fd9442a84bf4bc4240e530e3a01632274a Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Sat, 30 May 2026 12:25:58 +0200 Subject: [PATCH 1/5] Defer log_lines search_vector indexing off the insert path The AFTER INSERT trigger on log_lines computed to_tsvector synchronously inside the insert transaction (it never actually deferred anything) and double-wrote every row via a self-UPDATE. It also lacked the program_limit_exceeded guard dataclips got, so an oversized message aborted the insert (rolling back the whole batch for insert_all). Remove the trigger so inserts leave search_vector NULL, and backfill it out-of-band via a new Oban worker (Lightning.LogLines.SearchVectorWorker) on a dedicated search_indexing queue. Adds a guarded safe_to_tsvector SQL function and a partial index (WHERE search_vector IS NULL) so draining pending rows stays cheap. Covers both the single (run:log) and batch (run:batch_logs) insert paths. Live log streaming is unaffected (push-based via PubSub); only full-text log search lags slightly behind ingestion. --- lib/lightning/config/bootstrap.ex | 6 +- .../log_lines/search_vector_worker.ex | 101 ++++++++++++ ...30091125_add_safe_to_tsvector_function.exs | 20 +++ ...126_add_log_lines_pending_search_index.exs | 47 ++++++ ...7_drop_log_lines_search_vector_trigger.exs | 30 ++++ .../log_lines/search_vector_worker_test.exs | 146 ++++++++++++++++++ test/lightning/runs_test.exs | 86 +++++++++++ 7 files changed, 434 insertions(+), 2 deletions(-) create mode 100644 lib/lightning/log_lines/search_vector_worker.ex create mode 100644 priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs create mode 100644 priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs create mode 100644 priv/repo/migrations/20260530091127_drop_log_lines_search_vector_trigger.exs create mode 100644 test/lightning/log_lines/search_vector_worker_test.exs diff --git a/lib/lightning/config/bootstrap.ex b/lib/lightning/config/bootstrap.ex index cb7003eb39b..69daccf456f 100644 --- a/lib/lightning/config/bootstrap.ex +++ b/lib/lightning/config/bootstrap.ex @@ -270,7 +270,8 @@ defmodule Lightning.Config.Bootstrap do args: %{"type" => "monthly_project_digest"}}, # TODO - move this into an ENV? {"17 */2 * * *", Lightning.Projects, args: %{"type" => "data_retention"}}, - {"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker} + {"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker}, + {"* * * * *", Lightning.LogLines.SearchVectorWorker} ] cleanup_cron = @@ -300,7 +301,8 @@ defmodule Lightning.Config.Bootstrap do workflow_failures: 1, background: 1, history_exports: 1, - ai_assistant: 10 + ai_assistant: 10, + search_indexing: 1 ] # https://plausible.io/ is an open-source, privacy-friendly alternative to diff --git a/lib/lightning/log_lines/search_vector_worker.ex b/lib/lightning/log_lines/search_vector_worker.ex new file mode 100644 index 00000000000..cf89a902eb1 --- /dev/null +++ b/lib/lightning/log_lines/search_vector_worker.ex @@ -0,0 +1,101 @@ +defmodule Lightning.LogLines.SearchVectorWorker do + @moduledoc """ + Asynchronously backfills `log_lines.search_vector` for rows that were left + `NULL` at insert time. + + ## Why defer the tsvector? + + Computing the full-text `search_vector` synchronously (via an insert trigger) + put `to_tsvector` on the hot path of every log line write. Under heavy run + load that work serialises behind the worker's log firehose and slows + ingestion. A sibling migration removes the synchronous trigger, leaving the + column `NULL` on insert, and adds: + + * a `safe_to_tsvector(regconfig, text)` SQL function (tolerant of bad input); + * a partial index `... WHERE search_vector IS NULL` so finding pending rows + stays cheap. + + This worker then fills `search_vector` out-of-band. The read side + (`Lightning.Invocation`) queries with `to_tsquery('english_nostop', ...)`, so + this worker MUST build vectors with the matching `english_nostop` config, + otherwise searches would silently miss freshly-written log lines. + + ## Draining and snowballing + + Each run drains pending rows in bounded batches (`@batch_size` rows, up to + `@max_batches` per run). When a run consumes its full budget there is almost + certainly more backlog, so it enqueues an immediate follow-up job (a + "snowball") rather than waiting for the next 1-minute cron tick. This lets the + worker keep pace with bursty load while the dedicated `search_indexing` queue + (concurrency 1) plus job uniqueness keep the snowball self-limiting. + + The cron entry enqueues with default args; the snowball uses + `%{"trigger" => "snowball"}`. The differing `trigger` key produces a distinct + uniqueness key, so a queued snowball is never swallowed by the cron job (and + vice versa). + """ + + use Oban.Worker, + queue: :search_indexing, + priority: 1, + max_attempts: 10, + unique: [period: 55, keys: [:trigger]] + + alias Lightning.Repo + + require Logger + + # Rows to fill per batch. + @batch_size 2_500 + # Maximum batches to drain in a single run (per-run budget). + @max_batches 10 + + @drain_sql """ + WITH pending AS ( + SELECT id, run_id FROM log_lines + WHERE search_vector IS NULL + ORDER BY timestamp DESC + LIMIT $1 FOR UPDATE SKIP LOCKED + ) + UPDATE log_lines l + SET search_vector = safe_to_tsvector('public.english_nostop'::regconfig, l.message) + FROM pending p WHERE l.id = p.id AND l.run_id = p.run_id + """ + + @impl Oban.Worker + def perform(%Oban.Job{}) do + {filled, budget_exhausted?} = drain(0, 0) + + Logger.info(fn -> + # coveralls-ignore-start + "LogLines.SearchVectorWorker filled #{filled} search_vector row(s)." + # coveralls-ignore-stop + end) + + if budget_exhausted? do + # The run hit its per-run budget, so more backlog almost certainly + # remains. Snowball an immediate follow-up with a distinct uniqueness key + # so the cron job's uniqueness does not swallow it. + Oban.insert(Lightning.Oban, __MODULE__.new(%{"trigger" => "snowball"})) + end + + {:ok, filled} + end + + # Drains up to @max_batches batches, accumulating the number of rows filled. + # Returns {filled, budget_exhausted?}. Stops early when a batch fills fewer + # than @batch_size rows (backlog drained). + defp drain(filled, batches) when batches >= @max_batches do + {filled, true} + end + + defp drain(filled, batches) do + %{num_rows: num_rows} = Repo.query!(@drain_sql, [@batch_size]) + + if num_rows < @batch_size do + {filled + num_rows, false} + else + drain(filled + num_rows, batches + 1) + end + end +end diff --git a/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs b/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs new file mode 100644 index 00000000000..a63b41c109b --- /dev/null +++ b/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs @@ -0,0 +1,20 @@ +defmodule Lightning.Repo.Migrations.AddSafeToTsvectorFunction do + use Ecto.Migration + + def up do + execute(""" + CREATE FUNCTION safe_to_tsvector(config regconfig, doc text) RETURNS tsvector + LANGUAGE plpgsql IMMUTABLE STRICT AS $$ + BEGIN + RETURN to_tsvector(config, doc); + EXCEPTION WHEN program_limit_exceeded THEN + RETURN ''::tsvector; + END; + $$; + """) + end + + def down do + execute("DROP FUNCTION IF EXISTS safe_to_tsvector(regconfig, text);") + end +end diff --git a/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs b/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs new file mode 100644 index 00000000000..a8b5d777a88 --- /dev/null +++ b/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs @@ -0,0 +1,47 @@ +defmodule Lightning.Repo.Migrations.AddLogLinesPendingSearchIndex do + use Ecto.Migration + + @disable_ddl_transaction true + @disable_migration_lock true + + @num_partitions 100 + + def up do + # Create the partial index on the parent table (ONLY), unattached so far. + execute(""" + CREATE INDEX IF NOT EXISTS log_lines_pending_search_idx + ON ONLY log_lines (timestamp) + WHERE search_vector IS NULL + """) + + # Build each partition's index CONCURRENTLY (cannot be done on the + # partitioned parent in PG15), then attach them to the parent index. + manage_partitions(@num_partitions, &create_partition_index/2) + manage_partitions(@num_partitions, &attach_partition_index/2) + end + + def down do + # Dropping the parent index cascades to the attached partition indexes. + execute("DROP INDEX IF EXISTS log_lines_pending_search_idx") + end + + defp manage_partitions(num_partitions, manage_function) do + 1..num_partitions + |> Enum.each(&manage_function.(num_partitions, &1)) + end + + defp create_partition_index(_num_partitions, part_num) do + execute(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS log_lines_#{part_num}_pending_search_idx + ON log_lines_#{part_num} (timestamp) + WHERE search_vector IS NULL + """) + end + + defp attach_partition_index(_num_partitions, part_num) do + execute(""" + ALTER INDEX log_lines_pending_search_idx + ATTACH PARTITION log_lines_#{part_num}_pending_search_idx + """) + end +end diff --git a/priv/repo/migrations/20260530091127_drop_log_lines_search_vector_trigger.exs b/priv/repo/migrations/20260530091127_drop_log_lines_search_vector_trigger.exs new file mode 100644 index 00000000000..95c8843a7a9 --- /dev/null +++ b/priv/repo/migrations/20260530091127_drop_log_lines_search_vector_trigger.exs @@ -0,0 +1,30 @@ +defmodule Lightning.Repo.Migrations.DropLogLinesSearchVectorTrigger do + use Ecto.Migration + + def up do + execute("SET lock_timeout = '5s'") + execute("DROP TRIGGER IF EXISTS set_search_vector ON log_lines") + execute("DROP FUNCTION IF EXISTS update_search_vector()") + end + + def down do + execute(""" + CREATE OR REPLACE FUNCTION public.update_search_vector() + RETURNS trigger + LANGUAGE plpgsql + AS $function$ + begin + UPDATE log_lines SET search_vector = to_tsvector('english_nostop', message) WHERE id = NEW.id; + RETURN NEW; + end + $function$ ; + """) + + execute(""" + CREATE TRIGGER set_search_vector + AFTER INSERT ON log_lines FOR EACH ROW + WHEN (NEW."search_vector" IS NULL) + EXECUTE PROCEDURE update_search_vector(); + """) + end +end diff --git a/test/lightning/log_lines/search_vector_worker_test.exs b/test/lightning/log_lines/search_vector_worker_test.exs new file mode 100644 index 00000000000..8ce9c308800 --- /dev/null +++ b/test/lightning/log_lines/search_vector_worker_test.exs @@ -0,0 +1,146 @@ +defmodule Lightning.LogLines.SearchVectorWorkerTest do + use Lightning.DataCase, async: false + + import Lightning.Factories + + alias Lightning.LogLines.SearchVectorWorker + alias Lightning.Repo + alias Lightning.Runs + + setup do + dataclip = insert(:dataclip) + %{triggers: [trigger]} = workflow = insert(:simple_workflow) + + %{runs: [run]} = + work_order_for(trigger, workflow: workflow, dataclip: dataclip) + |> insert() + + %{run: run} + end + + # Inserts a log line via the public API. With the synchronous trigger removed + # this leaves `search_vector` NULL, which is exactly the pending state the + # worker drains. + defp append_log(run, message) do + {:ok, log_line} = + Runs.append_run_log(run, %{ + message: message, + timestamp: DateTime.utc_now() |> DateTime.to_unix(:millisecond) + }) + + log_line + end + + # Inserts a log line directly, bypassing the API, so we can stuff in an + # oversized message that would blow past the 1MB tsvector limit. `search_vector` + # is left NULL just like the regular insert path. + defp insert_raw_log(run, message) do + id = Ecto.UUID.generate() + + Repo.query!( + """ + INSERT INTO log_lines (id, run_id, message, timestamp) + VALUES ($1::uuid, $2::uuid, $3, now()) + """, + [ + Ecto.UUID.dump!(id), + Ecto.UUID.dump!(run.id), + message + ] + ) + + id + end + + defp search_vector_state(id) do + %{rows: [[is_null, matches]]} = + Repo.query!( + """ + SELECT search_vector IS NULL, + COALESCE(search_vector @@ to_tsquery('english_nostop', 'logline'), false) + FROM log_lines WHERE id = $1::uuid + """, + [Ecto.UUID.dump!(id)] + ) + + %{null?: is_null, matches?: matches} + end + + describe "perform/1" do + test "fills search_vector for pending log lines so they become searchable", + %{run: run} do + ids = + for n <- 1..5 do + append_log(run, "logline number #{n} doing work").id + end + + # Freshly inserted lines start out unindexed (deferred computation). + for id <- ids do + assert %{null?: true, matches?: false} = search_vector_state(id) + end + + assert {:ok, 5} = perform_job(SearchVectorWorker, %{}) + + # After draining, every row has a populated, matching search_vector. + for id <- ids do + assert %{null?: false, matches?: true} = search_vector_state(id) + end + end + + test "an oversized message becomes an empty vector without rolling back the batch", + %{run: run} do + normal_id = append_log(run, "logline a normal searchable entry").id + + # 200k distinct words exceeds the 1MB tsvector limit; safe_to_tsvector + # swallows the program_limit_exceeded and returns ''::tsvector. + oversized = + 1..200_000 + |> Enum.map_join(" ", &"w#{&1}") + + oversized_id = insert_raw_log(run, oversized) + + assert {:ok, filled} = perform_job(SearchVectorWorker, %{}) + assert filled == 2 + + # The normal row in the same batch still got indexed. + assert %{null?: false, matches?: true} = search_vector_state(normal_id) + + # The oversized row is set to a non-NULL empty vector (so it leaves the + # pending set and is not retried forever) but matches nothing. + assert %{null?: false, matches?: false} = search_vector_state(oversized_id) + + %{rows: [[vector]]} = + Repo.query!( + "SELECT search_vector::text FROM log_lines WHERE id = $1::uuid", + [Ecto.UUID.dump!(oversized_id)] + ) + + assert vector == "" + end + + test "does not snowball when the per-run budget is not exhausted", %{ + run: run + } do + for n <- 1..3, do: append_log(run, "logline modest backlog #{n}") + + Oban.Testing.with_testing_mode(:manual, fn -> + assert {:ok, 3} = perform_job(SearchVectorWorker, %{}) + + refute_enqueued(worker: SearchVectorWorker) + end) + end + + test "is idempotent: a second run with nothing pending fills 0 and snowballs nothing", + %{run: run} do + for n <- 1..3, do: append_log(run, "logline #{n}") + + assert {:ok, 3} = perform_job(SearchVectorWorker, %{}) + + Oban.Testing.with_testing_mode(:manual, fn -> + assert {:ok, 0} = perform_job(SearchVectorWorker, %{}) + + refute_enqueued(worker: SearchVectorWorker) + end) + end + end +end diff --git a/test/lightning/runs_test.exs b/test/lightning/runs_test.exs index 6029d971855..ff6188bf65f 100644 --- a/test/lightning/runs_test.exs +++ b/test/lightning/runs_test.exs @@ -13,6 +13,34 @@ defmodule Lightning.RunsTest do alias Lightning.WorkOrders alias Lightning.Workflows + # LogLine's Ecto schema has no search_vector field, so assert against the + # column via raw SQL. + defp search_vector_null?(id) do + %{rows: [[is_null]]} = + Lightning.Repo.query!( + "SELECT search_vector IS NULL FROM log_lines WHERE id = $1::uuid", + [Ecto.UUID.dump!(id)] + ) + + is_null + end + + defp log_line_searchable?(id, term) do + %{rows: [[matches]]} = + Lightning.Repo.query!( + """ + SELECT COALESCE( + search_vector @@ to_tsquery('english_nostop', $2), + false + ) + FROM log_lines WHERE id = $1::uuid + """, + [Ecto.UUID.dump!(id), term] + ) + + matches + end + describe "enqueue/1" do test "enqueues a run" do dataclip = insert(:dataclip) @@ -878,6 +906,64 @@ defmodule Lightning.RunsTest do assert log_line.message == ~s<{"foo":"bar"}> end + + test "leaves search_vector NULL at insert (deferred indexing)" do + dataclip = insert(:dataclip) + %{triggers: [trigger], jobs: [_job]} = workflow = insert(:simple_workflow) + + %{runs: [run]} = + work_order_for(trigger, workflow: workflow, dataclip: dataclip) + |> insert() + + {:ok, log_line} = + Runs.append_run_log(run, %{ + message: "a searchable logline message", + timestamp: DateTime.utc_now() |> DateTime.to_unix(:millisecond) + }) + + # The synchronous trigger is gone: search_vector is computed out-of-band + # by Lightning.LogLines.SearchVectorWorker, so it starts NULL and is not + # yet matched by a full-text query. + assert search_vector_null?(log_line.id) + refute log_line_searchable?(log_line.id, "searchable") + end + end + + describe "append_run_logs_batch/3" do + test "inserts all lines, broadcasts log_appended, and defers search_vector" do + dataclip = insert(:dataclip) + %{triggers: [trigger]} = workflow = insert(:simple_workflow) + + %{runs: [run]} = + work_order_for(trigger, workflow: workflow, dataclip: dataclip) + |> insert() + + Runs.subscribe(run) + + now = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + entries = [ + %{message: "batch logline alpha", timestamp: now}, + %{message: "batch logline beta", timestamp: now + 1}, + %{message: "batch logline gamma", timestamp: now + 2} + ] + + {:ok, log_lines} = Runs.append_run_logs_batch(run, entries) + + assert length(log_lines) == 3 + assert Enum.map(log_lines, & &1.message) == Enum.map(entries, & &1.message) + + # Each inserted line broadcasts a LogAppended event. + for _ <- entries do + assert_received %Runs.Events.LogAppended{} + end + + # All rows are persisted with a NULL search_vector (deferred indexing). + for log_line <- log_lines do + assert search_vector_null?(log_line.id) + refute log_line_searchable?(log_line.id, "logline") + end + end end describe "mark_unfinished_steps_lost/1" do From d458ad4e375cc39470f78ec397f5d017583074f8 Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Sat, 30 May 2026 20:01:31 +0200 Subject: [PATCH 2/5] Harden log_lines search_vector deferral Follow-up hardening on the search_vector deferral, from a review pass: - Fix the worker's snowball chain. Oban's default unique states include :executing and :completed, so a running snowball job matched itself when enqueueing its successor and the insert was silently deduped, breaking the chain after one hop. Restrict uniqueness to [:available, :scheduled]. - Make the pending-search index migration re-runnable: a failed CREATE INDEX CONCURRENTLY leaves an INVALID index that IF NOT EXISTS would skip and ATTACH would leave the parent invalid forever. Drop any invalid leftover first. - Make safe_to_tsvector NULL-defensive and re-runnable (CREATE OR REPLACE, drop STRICT, COALESCE the doc) so it never returns NULL and leaves a row stuck in the pending set. Also the template for the dataclip fix. - Add snowball uniqueness regression tests. --- CHANGELOG.md | 5 ++ .../log_lines/search_vector_worker.ex | 9 +++- ...30091125_add_safe_to_tsvector_function.exs | 10 ++-- ...126_add_log_lines_pending_search_index.exs | 19 +++++++ .../log_lines/search_vector_worker_test.exs | 49 +++++++++++++++++++ 5 files changed, 88 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 87948758c96..5b94dac21da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,11 @@ and this project adheres to ### Fixed +- Reduce `run:log` channel timeouts under heavy log volume by moving `log_lines` + search indexing off the insert path. The full-text search vector is now + backfilled by a background worker rather than computed synchronously on every + insert, so log search is eventually-consistent (typically within a minute). + [#4425](https://github.com/OpenFn/lightning/issues/4425) - Channel join crashes when multiple users open the same workflow concurrently [#4802](https://github.com/OpenFn/lightning/issues/4802) - Fix `purge_deleted` Oban job crashing when a soft-deleted project has diff --git a/lib/lightning/log_lines/search_vector_worker.ex b/lib/lightning/log_lines/search_vector_worker.ex index cf89a902eb1..6cc39ce132c 100644 --- a/lib/lightning/log_lines/search_vector_worker.ex +++ b/lib/lightning/log_lines/search_vector_worker.ex @@ -39,7 +39,14 @@ defmodule Lightning.LogLines.SearchVectorWorker do queue: :search_indexing, priority: 1, max_attempts: 10, - unique: [period: 55, keys: [:trigger]] + # `states` is restricted to the queued states on purpose. Oban's default + # unique states include `:executing` and `:completed`, which would make a + # running snowball job match *itself* when it tries to enqueue its + # successor, silently dedup the insert, and break the chain after a single + # hop. Limiting uniqueness to `:available`/`:scheduled` still guarantees at + # most one queued snowball (and one queued cron heartbeat, via the distinct + # `:trigger` key) while letting the executing job enqueue the next link. + unique: [period: 55, keys: [:trigger], states: [:available, :scheduled]] alias Lightning.Repo diff --git a/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs b/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs index a63b41c109b..d8c4ece9097 100644 --- a/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs +++ b/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs @@ -2,11 +2,15 @@ defmodule Lightning.Repo.Migrations.AddSafeToTsvectorFunction do use Ecto.Migration def up do + # Not STRICT: a STRICT function returns NULL (without running) when `doc` is + # NULL, which would leave the row's search_vector NULL forever and stuck in + # the pending index. COALESCE the doc instead so the function always yields + # a non-NULL tsvector. CREATE OR REPLACE keeps the migration re-runnable. execute(""" - CREATE FUNCTION safe_to_tsvector(config regconfig, doc text) RETURNS tsvector - LANGUAGE plpgsql IMMUTABLE STRICT AS $$ + CREATE OR REPLACE FUNCTION safe_to_tsvector(config regconfig, doc text) RETURNS tsvector + LANGUAGE plpgsql IMMUTABLE AS $$ BEGIN - RETURN to_tsvector(config, doc); + RETURN to_tsvector(config, COALESCE(doc, '')); EXCEPTION WHEN program_limit_exceeded THEN RETURN ''::tsvector; END; diff --git a/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs b/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs index a8b5d777a88..93351f8e3df 100644 --- a/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs +++ b/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs @@ -31,6 +31,25 @@ defmodule Lightning.Repo.Migrations.AddLogLinesPendingSearchIndex do end defp create_partition_index(_num_partitions, part_num) do + # A failed CREATE INDEX CONCURRENTLY leaves an INVALID index behind. The + # IF NOT EXISTS below would then skip rebuilding it, and the subsequent + # ATTACH would leave the parent permanently INVALID. Drop any invalid + # leftover first so a re-run rebuilds it cleanly. (A valid index is kept; + # only invalid ones are dropped.) + execute(""" + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM pg_class c + JOIN pg_index i ON i.indexrelid = c.oid + WHERE c.relname = 'log_lines_#{part_num}_pending_search_idx' + AND NOT i.indisvalid + ) THEN + EXECUTE 'DROP INDEX log_lines_#{part_num}_pending_search_idx'; + END IF; + END $$; + """) + execute(""" CREATE INDEX CONCURRENTLY IF NOT EXISTS log_lines_#{part_num}_pending_search_idx ON log_lines_#{part_num} (timestamp) diff --git a/test/lightning/log_lines/search_vector_worker_test.exs b/test/lightning/log_lines/search_vector_worker_test.exs index 8ce9c308800..4525cb1d33d 100644 --- a/test/lightning/log_lines/search_vector_worker_test.exs +++ b/test/lightning/log_lines/search_vector_worker_test.exs @@ -143,4 +143,53 @@ defmodule Lightning.LogLines.SearchVectorWorkerTest do end) end end + + describe "snowball uniqueness" do + # Regression: Oban's default unique states include :executing and :completed, + # so a running snowball job (state :executing) matched *itself* when it tried + # to enqueue its successor — the insert was silently deduped and the chain + # died after one hop. The worker restricts uniqueness to the queued states so + # an executing job can always enqueue the next link. + test "an executing snowball does not block enqueuing its successor" do + Oban.Testing.with_testing_mode(:manual, fn -> + {:ok, running} = + Oban.insert( + Lightning.Oban, + SearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + # Mimic Oban marking the job as executing while perform/1 runs. + from(j in Oban.Job, where: j.id == ^running.id) + |> Repo.update_all(set: [state: "executing"]) + + {:ok, successor} = + Oban.insert( + Lightning.Oban, + SearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + refute successor.conflict? + refute successor.id == running.id + end) + end + + test "two queued snowballs are deduped to one" do + Oban.Testing.with_testing_mode(:manual, fn -> + {:ok, first} = + Oban.insert( + Lightning.Oban, + SearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + {:ok, second} = + Oban.insert( + Lightning.Oban, + SearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + assert second.conflict? + assert second.id == first.id + end) + end + end end From c98529a390442a1fbbf8020e9fe591fb443488ff Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Sat, 30 May 2026 20:29:46 +0200 Subject: [PATCH 3/5] Refine comments and moduledoc for deferred log_lines indexing Trim hindsight/diff-narrating comments down to what's non-obvious, and rewrite the SearchVectorWorker moduledoc to read as documentation of the mechanism rather than a justification of the change. --- .../log_lines/search_vector_worker.ex | 72 +++++++------------ ...30091125_add_safe_to_tsvector_function.exs | 7 +- ...126_add_log_lines_pending_search_index.exs | 10 ++- .../log_lines/search_vector_worker_test.exs | 15 ++-- test/lightning/runs_test.exs | 7 +- 5 files changed, 41 insertions(+), 70 deletions(-) diff --git a/lib/lightning/log_lines/search_vector_worker.ex b/lib/lightning/log_lines/search_vector_worker.ex index 6cc39ce132c..b02face5a5c 100644 --- a/lib/lightning/log_lines/search_vector_worker.ex +++ b/lib/lightning/log_lines/search_vector_worker.ex @@ -1,60 +1,43 @@ defmodule Lightning.LogLines.SearchVectorWorker do @moduledoc """ - Asynchronously backfills `log_lines.search_vector` for rows that were left - `NULL` at insert time. - - ## Why defer the tsvector? - - Computing the full-text `search_vector` synchronously (via an insert trigger) - put `to_tsvector` on the hot path of every log line write. Under heavy run - load that work serialises behind the worker's log firehose and slows - ingestion. A sibling migration removes the synchronous trigger, leaving the - column `NULL` on insert, and adds: - - * a `safe_to_tsvector(regconfig, text)` SQL function (tolerant of bad input); - * a partial index `... WHERE search_vector IS NULL` so finding pending rows - stays cheap. - - This worker then fills `search_vector` out-of-band. The read side - (`Lightning.Invocation`) queries with `to_tsquery('english_nostop', ...)`, so - this worker MUST build vectors with the matching `english_nostop` config, - otherwise searches would silently miss freshly-written log lines. - - ## Draining and snowballing - - Each run drains pending rows in bounded batches (`@batch_size` rows, up to - `@max_batches` per run). When a run consumes its full budget there is almost - certainly more backlog, so it enqueues an immediate follow-up job (a - "snowball") rather than waiting for the next 1-minute cron tick. This lets the - worker keep pace with bursty load while the dedicated `search_indexing` queue - (concurrency 1) plus job uniqueness keep the snowball self-limiting. - - The cron entry enqueues with default args; the snowball uses - `%{"trigger" => "snowball"}`. The differing `trigger` key produces a distinct - uniqueness key, so a queued snowball is never swallowed by the cron job (and - vice versa). + Backfills the full-text `search_vector` on `log_lines` rows. + + Log lines are inserted with `search_vector` left `NULL`; the vector is built + here rather than on the insert path, keeping `to_tsvector` off the hot path of + high-volume log ingestion. Search is eventually consistent as a result, + typically catching up within a minute. + + Two database objects support this: `safe_to_tsvector(regconfig, text)`, which + builds the vector while tolerating NULL and oversized input, and a partial + index over `search_vector IS NULL`, which keeps locating pending rows cheap as + the table grows. Vectors use the `english_nostop` config to match the read + side (`Lightning.Invocation`), which queries with + `to_tsquery('english_nostop', ...)`. + + Each run drains pending rows newest-first, in batches of `@batch_size` up to + `@max_batches` per run. A run that exhausts its budget leaves backlog behind + and enqueues an immediate follow-up ("snowball"); otherwise the minute-ly cron + tick keeps pace. The worker runs on the dedicated `search_indexing` queue at + concurrency 1, so only one job executes at a time, and the cron tick and the + snowball carry distinct `trigger` args, so job uniqueness allows one of each to + queue but never a duplicate. """ use Oban.Worker, queue: :search_indexing, priority: 1, max_attempts: 10, - # `states` is restricted to the queued states on purpose. Oban's default - # unique states include `:executing` and `:completed`, which would make a - # running snowball job match *itself* when it tries to enqueue its - # successor, silently dedup the insert, and break the chain after a single - # hop. Limiting uniqueness to `:available`/`:scheduled` still guarantees at - # most one queued snowball (and one queued cron heartbeat, via the distinct - # `:trigger` key) while letting the executing job enqueue the next link. + # Restrict uniqueness to queued states. Oban's defaults also dedup against + # :executing/:completed, so a running snowball would match itself and fail + # to enqueue its successor — breaking the chain after one hop. unique: [period: 55, keys: [:trigger], states: [:available, :scheduled]] alias Lightning.Repo require Logger - # Rows to fill per batch. @batch_size 2_500 - # Maximum batches to drain in a single run (per-run budget). + # Per-run budget. @max_batches 10 @drain_sql """ @@ -80,9 +63,8 @@ defmodule Lightning.LogLines.SearchVectorWorker do end) if budget_exhausted? do - # The run hit its per-run budget, so more backlog almost certainly - # remains. Snowball an immediate follow-up with a distinct uniqueness key - # so the cron job's uniqueness does not swallow it. + # Budget exhausted, so backlog likely remains: enqueue an immediate + # follow-up rather than waiting for the next cron tick. Oban.insert(Lightning.Oban, __MODULE__.new(%{"trigger" => "snowball"})) end diff --git a/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs b/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs index d8c4ece9097..e6fef135deb 100644 --- a/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs +++ b/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs @@ -2,10 +2,9 @@ defmodule Lightning.Repo.Migrations.AddSafeToTsvectorFunction do use Ecto.Migration def up do - # Not STRICT: a STRICT function returns NULL (without running) when `doc` is - # NULL, which would leave the row's search_vector NULL forever and stuck in - # the pending index. COALESCE the doc instead so the function always yields - # a non-NULL tsvector. CREATE OR REPLACE keeps the migration re-runnable. + # Deliberately not STRICT: a STRICT function returns NULL for a NULL doc, + # which would leave search_vector NULL forever and stuck in the pending + # index. COALESCE instead so the result is always a non-NULL tsvector. execute(""" CREATE OR REPLACE FUNCTION safe_to_tsvector(config regconfig, doc text) RETURNS tsvector LANGUAGE plpgsql IMMUTABLE AS $$ diff --git a/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs b/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs index 93351f8e3df..267a58328ed 100644 --- a/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs +++ b/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs @@ -7,7 +7,7 @@ defmodule Lightning.Repo.Migrations.AddLogLinesPendingSearchIndex do @num_partitions 100 def up do - # Create the partial index on the parent table (ONLY), unattached so far. + # Partial index on the parent (ONLY); attached per-partition below. execute(""" CREATE INDEX IF NOT EXISTS log_lines_pending_search_idx ON ONLY log_lines (timestamp) @@ -31,11 +31,9 @@ defmodule Lightning.Repo.Migrations.AddLogLinesPendingSearchIndex do end defp create_partition_index(_num_partitions, part_num) do - # A failed CREATE INDEX CONCURRENTLY leaves an INVALID index behind. The - # IF NOT EXISTS below would then skip rebuilding it, and the subsequent - # ATTACH would leave the parent permanently INVALID. Drop any invalid - # leftover first so a re-run rebuilds it cleanly. (A valid index is kept; - # only invalid ones are dropped.) + # A failed CREATE INDEX CONCURRENTLY leaves an INVALID index that IF NOT + # EXISTS would skip, so the ATTACH below would mark the parent INVALID too. + # Drop any invalid leftover first so a re-run rebuilds cleanly. execute(""" DO $$ BEGIN diff --git a/test/lightning/log_lines/search_vector_worker_test.exs b/test/lightning/log_lines/search_vector_worker_test.exs index 4525cb1d33d..a6089c69a28 100644 --- a/test/lightning/log_lines/search_vector_worker_test.exs +++ b/test/lightning/log_lines/search_vector_worker_test.exs @@ -18,9 +18,8 @@ defmodule Lightning.LogLines.SearchVectorWorkerTest do %{run: run} end - # Inserts a log line via the public API. With the synchronous trigger removed - # this leaves `search_vector` NULL, which is exactly the pending state the - # worker drains. + # Inserts via the public API, which leaves `search_vector` NULL — the pending + # state the worker drains. defp append_log(run, message) do {:ok, log_line} = Runs.append_run_log(run, %{ @@ -74,14 +73,12 @@ defmodule Lightning.LogLines.SearchVectorWorkerTest do append_log(run, "logline number #{n} doing work").id end - # Freshly inserted lines start out unindexed (deferred computation). for id <- ids do assert %{null?: true, matches?: false} = search_vector_state(id) end assert {:ok, 5} = perform_job(SearchVectorWorker, %{}) - # After draining, every row has a populated, matching search_vector. for id <- ids do assert %{null?: false, matches?: true} = search_vector_state(id) end @@ -145,11 +142,9 @@ defmodule Lightning.LogLines.SearchVectorWorkerTest do end describe "snowball uniqueness" do - # Regression: Oban's default unique states include :executing and :completed, - # so a running snowball job (state :executing) matched *itself* when it tried - # to enqueue its successor — the insert was silently deduped and the chain - # died after one hop. The worker restricts uniqueness to the queued states so - # an executing job can always enqueue the next link. + # Guards the snowball chain: an executing job must be able to enqueue its + # successor. Oban's default unique states include :executing, so a snowball + # would otherwise match itself and the chain would die after one hop. test "an executing snowball does not block enqueuing its successor" do Oban.Testing.with_testing_mode(:manual, fn -> {:ok, running} = diff --git a/test/lightning/runs_test.exs b/test/lightning/runs_test.exs index ff6188bf65f..b3a3477fd3b 100644 --- a/test/lightning/runs_test.exs +++ b/test/lightning/runs_test.exs @@ -921,9 +921,8 @@ defmodule Lightning.RunsTest do timestamp: DateTime.utc_now() |> DateTime.to_unix(:millisecond) }) - # The synchronous trigger is gone: search_vector is computed out-of-band - # by Lightning.LogLines.SearchVectorWorker, so it starts NULL and is not - # yet matched by a full-text query. + # search_vector is computed out-of-band by SearchVectorWorker, so it + # starts NULL and isn't matched by a full-text query yet. assert search_vector_null?(log_line.id) refute log_line_searchable?(log_line.id, "searchable") end @@ -953,12 +952,10 @@ defmodule Lightning.RunsTest do assert length(log_lines) == 3 assert Enum.map(log_lines, & &1.message) == Enum.map(entries, & &1.message) - # Each inserted line broadcasts a LogAppended event. for _ <- entries do assert_received %Runs.Events.LogAppended{} end - # All rows are persisted with a NULL search_vector (deferred indexing). for log_line <- log_lines do assert search_vector_null?(log_line.id) refute log_line_searchable?(log_line.id, "logline") From 7b7f49a0b3179064bccbe63f217a423d07016eae Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Tue, 2 Jun 2026 16:47:02 +0200 Subject: [PATCH 4/5] Flush deferred log search index in invocation search tests Deferring log_lines.search_vector indexing off the insert path means inserted log lines have a NULL search_vector until SearchVectorWorker drains them. Tests that insert log lines and then search them on the log field matched nothing, since the worker never runs on its own in the test environment. Add Lightning.TestUtils.flush_log_search_index/0, which runs the worker synchronously in-process via Oban.Testing.perform_job/3 so it indexes the uncommitted sandbox rows, and call it in the two invocation_test setups that insert log lines before searching. Also add a positive log-match assertion to the stemming test so a regression that re-NULLs the vector fails loudly rather than passing on an empty result. --- test/lightning/invocation_test.exs | 16 ++++++++++++++++ test/support/test_utils.ex | 22 ++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/test/lightning/invocation_test.exs b/test/lightning/invocation_test.exs index 730589a2066..2d278c1b22a 100644 --- a/test/lightning/invocation_test.exs +++ b/test/lightning/invocation_test.exs @@ -1599,6 +1599,8 @@ defmodule Lightning.InvocationTest do timestamp: Timex.now() ) + flush_log_search_index() + %{ project: project, dataclip: dataclip, @@ -1695,6 +1697,18 @@ defmodule Lightning.InvocationTest do test "search on logs does NOT return 'stem' matches... only exact matches", %{project: project} do + # Positive control: the log vector is populated, so an exact token matches. + # Without this, a regression that leaves search_vector NULL would make the + # negative assertions below pass vacuously. + assert [_found] = + Invocation.search_workorders( + project, + SearchParams.new(%{ + "search_term" => "playing", + "search_fields" => ["log"] + }) + ).entries + assert [] = Invocation.search_workorders( project, @@ -1943,6 +1957,8 @@ defmodule Lightning.InvocationTest do timestamp: Timex.now() ) + flush_log_search_index() + %{ project: project, workorder: workorder, diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex index 3f5c50af19a..0484517767f 100644 --- a/test/support/test_utils.ex +++ b/test/support/test_utils.ex @@ -1,6 +1,8 @@ defmodule Lightning.TestUtils do @moduledoc false + alias Lightning.LogLines.SearchVectorWorker + @doc """ Assert that the given context has the given keys, otherwise raise an error. @@ -19,6 +21,26 @@ defmodule Lightning.TestUtils do :ok end + @doc """ + Drain the deferred `log_lines.search_vector` backlog synchronously. + + Log lines are inserted with `search_vector` left NULL and indexed asynchronously + by `Lightning.LogLines.SearchVectorWorker`. In tests that insert log lines and then + query log search, call this after inserting (and before searching) so the vector is + populated within the SQL sandbox. + + Runs the worker in-process via `Oban.Testing.perform_job/3`, so it sees the + uncommitted sandbox rows. Returns the number of rows indexed; a no-op (`0`) when + nothing is pending, so it is safe to call unconditionally. + """ + @spec flush_log_search_index() :: non_neg_integer() + def flush_log_search_index do + {:ok, indexed} = + Oban.Testing.perform_job(SearchVectorWorker, %{}, repo: Lightning.Repo) + + indexed + end + @doc """ Merge the given setups into the given context. Just works a bit like `setup` on ExUnit.Case. From a0181748406a13d9d011d464c3fb9df9e8fa8f59 Mon Sep 17 00:00:00 2001 From: Stuart Corbishley Date: Wed, 3 Jun 2026 17:50:42 +0200 Subject: [PATCH 5/5] Cover log_lines search_vector budget-exhaustion path Make the SearchVectorWorker batch_size/max_batches configurable through the Lightning.Config seam (defaults 2500/10 in config.exs, 2/2 in test.exs) so a test can drive the budget-exhaustion/snowball path with a handful of rows. Restructure drain/2 into drain/4 since the max_batches guard can no longer be a compile-time literal, and add a test that exercises the recursive drain, budget guard, and snowball enqueue. --- config/config.exs | 2 + config/test.exs | 4 ++ lib/lightning/config.ex | 24 ++++++++++++ .../log_lines/search_vector_worker.ex | 38 +++++++++---------- .../log_lines/search_vector_worker_test.exs | 24 +++++++++++- 5 files changed, 71 insertions(+), 21 deletions(-) diff --git a/config/config.exs b/config/config.exs index 4b0d2bf1ec1..2881e963a58 100644 --- a/config/config.exs +++ b/config/config.exs @@ -187,6 +187,8 @@ config :lightning, :is_resettable_demo, false config :lightning, :default_retention_period, nil config :lightning, :claim_work_mem, nil +config :lightning, :log_lines_search_indexing, batch_size: 2_500, max_batches: 10 + config :lightning, Lightning.Runtime.RuntimeManager, start: false config :lightning, LightningWeb.CollectionsController, diff --git a/config/test.exs b/config/test.exs index 9641f9e9a60..4456d81180b 100644 --- a/config/test.exs +++ b/config/test.exs @@ -133,6 +133,10 @@ config :lightning, CLI, child_process_mod: FakeRambo config :lightning, :is_resettable_demo, true +# Tiny budget so a handful of pending rows fills multiple full batches and trips +# the per-run budget guard, exercising the snowball follow-up path. +config :lightning, :log_lines_search_indexing, batch_size: 2, max_batches: 2 + config :lightning, :github_app, app_id: "111111", app_name: "test-github", diff --git a/lib/lightning/config.ex b/lib/lightning/config.ex index 67ba7a744d9..734864e1018 100644 --- a/lib/lightning/config.ex +++ b/lib/lightning/config.ex @@ -97,6 +97,20 @@ defmodule Lightning.Config do Application.get_env(:lightning, :activity_cleanup_chunk_size) end + @impl true + def log_lines_search_indexing_batch_size do + log_lines_search_indexing_config() |> Keyword.fetch!(:batch_size) + end + + @impl true + def log_lines_search_indexing_max_batches do + log_lines_search_indexing_config() |> Keyword.fetch!(:max_batches) + end + + defp log_lines_search_indexing_config do + Application.get_env(:lightning, :log_lines_search_indexing, []) + end + @impl true def default_ecto_database_timeout do Application.get_env(:lightning, Lightning.Repo) |> Keyword.get(:timeout) @@ -483,6 +497,8 @@ defmodule Lightning.Config do @callback promex_enabled?() :: boolean() @callback purge_deleted_after_days() :: integer() @callback activity_cleanup_chunk_size() :: integer() + @callback log_lines_search_indexing_batch_size() :: pos_integer() + @callback log_lines_search_indexing_max_batches() :: pos_integer() @callback default_ecto_database_timeout() :: integer() @callback repo_connection_token_signer() :: Joken.Signer.t() @callback reset_password_token_validity_in_days() :: integer() @@ -597,6 +613,14 @@ defmodule Lightning.Config do impl().activity_cleanup_chunk_size() end + def log_lines_search_indexing_batch_size do + impl().log_lines_search_indexing_batch_size() + end + + def log_lines_search_indexing_max_batches do + impl().log_lines_search_indexing_max_batches() + end + def default_ecto_database_timeout do impl().default_ecto_database_timeout() end diff --git a/lib/lightning/log_lines/search_vector_worker.ex b/lib/lightning/log_lines/search_vector_worker.ex index b02face5a5c..b742e5c2a19 100644 --- a/lib/lightning/log_lines/search_vector_worker.ex +++ b/lib/lightning/log_lines/search_vector_worker.ex @@ -14,8 +14,9 @@ defmodule Lightning.LogLines.SearchVectorWorker do side (`Lightning.Invocation`), which queries with `to_tsquery('english_nostop', ...)`. - Each run drains pending rows newest-first, in batches of `@batch_size` up to - `@max_batches` per run. A run that exhausts its budget leaves backlog behind + Each run drains pending rows newest-first, in batches up to a per-run budget + (batch size and max batches are configurable via `Lightning.Config`). A run + that exhausts its budget leaves backlog behind and enqueues an immediate follow-up ("snowball"); otherwise the minute-ly cron tick keeps pace. The worker runs on the dedicated `search_indexing` queue at concurrency 1, so only one job executes at a time, and the cron tick and the @@ -36,10 +37,6 @@ defmodule Lightning.LogLines.SearchVectorWorker do require Logger - @batch_size 2_500 - # Per-run budget. - @max_batches 10 - @drain_sql """ WITH pending AS ( SELECT id, run_id FROM log_lines @@ -54,7 +51,10 @@ defmodule Lightning.LogLines.SearchVectorWorker do @impl Oban.Worker def perform(%Oban.Job{}) do - {filled, budget_exhausted?} = drain(0, 0) + batch_size = Lightning.Config.log_lines_search_indexing_batch_size() + max_batches = Lightning.Config.log_lines_search_indexing_max_batches() + + {filled, budget_exhausted?} = drain(0, 0, batch_size, max_batches) Logger.info(fn -> # coveralls-ignore-start @@ -71,20 +71,20 @@ defmodule Lightning.LogLines.SearchVectorWorker do {:ok, filled} end - # Drains up to @max_batches batches, accumulating the number of rows filled. + # Drains up to max_batches batches, accumulating the number of rows filled. # Returns {filled, budget_exhausted?}. Stops early when a batch fills fewer - # than @batch_size rows (backlog drained). - defp drain(filled, batches) when batches >= @max_batches do - {filled, true} - end - - defp drain(filled, batches) do - %{num_rows: num_rows} = Repo.query!(@drain_sql, [@batch_size]) - - if num_rows < @batch_size do - {filled + num_rows, false} + # than batch_size rows (backlog drained). + defp drain(filled, batches, batch_size, max_batches) do + if batches >= max_batches do + {filled, true} else - drain(filled + num_rows, batches + 1) + %{num_rows: num_rows} = Repo.query!(@drain_sql, [batch_size]) + + if num_rows < batch_size do + {filled + num_rows, false} + else + drain(filled + num_rows, batches + 1, batch_size, max_batches) + end end end end diff --git a/test/lightning/log_lines/search_vector_worker_test.exs b/test/lightning/log_lines/search_vector_worker_test.exs index a6089c69a28..ee1062faf35 100644 --- a/test/lightning/log_lines/search_vector_worker_test.exs +++ b/test/lightning/log_lines/search_vector_worker_test.exs @@ -68,8 +68,11 @@ defmodule Lightning.LogLines.SearchVectorWorkerTest do describe "perform/1" do test "fills search_vector for pending log lines so they become searchable", %{run: run} do + # Stay within the tiny test budget (batch_size: 2, max_batches: 2) so a + # single run drains everything: batch 1 fills 2, batch 2 fills 1 (< 2) and + # stops without tripping the budget. ids = - for n <- 1..5 do + for n <- 1..3 do append_log(run, "logline number #{n} doing work").id end @@ -77,7 +80,7 @@ defmodule Lightning.LogLines.SearchVectorWorkerTest do assert %{null?: true, matches?: false} = search_vector_state(id) end - assert {:ok, 5} = perform_job(SearchVectorWorker, %{}) + assert {:ok, 3} = perform_job(SearchVectorWorker, %{}) for id <- ids do assert %{null?: false, matches?: true} = search_vector_state(id) @@ -127,6 +130,23 @@ defmodule Lightning.LogLines.SearchVectorWorkerTest do end) end + test "snowballs an immediate follow-up when the per-run budget is exhausted", + %{run: run} do + # config/test.exs sets batch_size: 2, max_batches: 2. With 5 pending rows + # the run fills two full batches (4 rows), reaches max_batches, and trips + # the budget guard, leaving backlog behind and enqueuing a snowball. + for n <- 1..5, do: append_log(run, "logline overflowing backlog #{n}") + + Oban.Testing.with_testing_mode(:manual, fn -> + assert {:ok, 4} = perform_job(SearchVectorWorker, %{}) + + assert_enqueued( + worker: SearchVectorWorker, + args: %{"trigger" => "snowball"} + ) + end) + end + test "is idempotent: a second run with nothing pending fills 0 and snowballs nothing", %{run: run} do for n <- 1..3, do: append_log(run, "logline #{n}")