diff --git a/CHANGELOG.md b/CHANGELOG.md index 87948758c96..5b94dac21da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,11 @@ and this project adheres to ### Fixed +- Reduce `run:log` channel timeouts under heavy log volume by moving `log_lines` + search indexing off the insert path. The full-text search vector is now + backfilled by a background worker rather than computed synchronously on every + insert, so log search is eventually-consistent (typically within a minute). + [#4425](https://github.com/OpenFn/lightning/issues/4425) - Channel join crashes when multiple users open the same workflow concurrently [#4802](https://github.com/OpenFn/lightning/issues/4802) - Fix `purge_deleted` Oban job crashing when a soft-deleted project has diff --git a/config/config.exs b/config/config.exs index 4b0d2bf1ec1..2881e963a58 100644 --- a/config/config.exs +++ b/config/config.exs @@ -187,6 +187,8 @@ config :lightning, :is_resettable_demo, false config :lightning, :default_retention_period, nil config :lightning, :claim_work_mem, nil +config :lightning, :log_lines_search_indexing, batch_size: 2_500, max_batches: 10 + config :lightning, Lightning.Runtime.RuntimeManager, start: false config :lightning, LightningWeb.CollectionsController, diff --git a/config/test.exs b/config/test.exs index 9641f9e9a60..4456d81180b 100644 --- a/config/test.exs +++ b/config/test.exs @@ -133,6 +133,10 @@ config :lightning, CLI, child_process_mod: FakeRambo config :lightning, :is_resettable_demo, true +# Tiny budget so a handful of pending rows fills multiple full batches and trips +# the per-run budget guard, exercising the snowball follow-up path. +config :lightning, :log_lines_search_indexing, batch_size: 2, max_batches: 2 + config :lightning, :github_app, app_id: "111111", app_name: "test-github", diff --git a/lib/lightning/config.ex b/lib/lightning/config.ex index 67ba7a744d9..734864e1018 100644 --- a/lib/lightning/config.ex +++ b/lib/lightning/config.ex @@ -97,6 +97,20 @@ defmodule Lightning.Config do Application.get_env(:lightning, :activity_cleanup_chunk_size) end + @impl true + def log_lines_search_indexing_batch_size do + log_lines_search_indexing_config() |> Keyword.fetch!(:batch_size) + end + + @impl true + def log_lines_search_indexing_max_batches do + log_lines_search_indexing_config() |> Keyword.fetch!(:max_batches) + end + + defp log_lines_search_indexing_config do + Application.get_env(:lightning, :log_lines_search_indexing, []) + end + @impl true def default_ecto_database_timeout do Application.get_env(:lightning, Lightning.Repo) |> Keyword.get(:timeout) @@ -483,6 +497,8 @@ defmodule Lightning.Config do @callback promex_enabled?() :: boolean() @callback purge_deleted_after_days() :: integer() @callback activity_cleanup_chunk_size() :: integer() + @callback log_lines_search_indexing_batch_size() :: pos_integer() + @callback log_lines_search_indexing_max_batches() :: pos_integer() @callback default_ecto_database_timeout() :: integer() @callback repo_connection_token_signer() :: Joken.Signer.t() @callback reset_password_token_validity_in_days() :: integer() @@ -597,6 +613,14 @@ defmodule Lightning.Config do impl().activity_cleanup_chunk_size() end + def log_lines_search_indexing_batch_size do + impl().log_lines_search_indexing_batch_size() + end + + def log_lines_search_indexing_max_batches do + impl().log_lines_search_indexing_max_batches() + end + def default_ecto_database_timeout do impl().default_ecto_database_timeout() end diff --git a/lib/lightning/config/bootstrap.ex b/lib/lightning/config/bootstrap.ex index cb7003eb39b..69daccf456f 100644 --- a/lib/lightning/config/bootstrap.ex +++ b/lib/lightning/config/bootstrap.ex @@ -270,7 +270,8 @@ defmodule Lightning.Config.Bootstrap do args: %{"type" => "monthly_project_digest"}}, # TODO - move this into an ENV? {"17 */2 * * *", Lightning.Projects, args: %{"type" => "data_retention"}}, - {"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker} + {"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker}, + {"* * * * *", Lightning.LogLines.SearchVectorWorker} ] cleanup_cron = @@ -300,7 +301,8 @@ defmodule Lightning.Config.Bootstrap do workflow_failures: 1, background: 1, history_exports: 1, - ai_assistant: 10 + ai_assistant: 10, + search_indexing: 1 ] # https://plausible.io/ is an open-source, privacy-friendly alternative to diff --git a/lib/lightning/log_lines/search_vector_worker.ex b/lib/lightning/log_lines/search_vector_worker.ex new file mode 100644 index 00000000000..b742e5c2a19 --- /dev/null +++ b/lib/lightning/log_lines/search_vector_worker.ex @@ -0,0 +1,90 @@ +defmodule Lightning.LogLines.SearchVectorWorker do + @moduledoc """ + Backfills the full-text `search_vector` on `log_lines` rows. + + Log lines are inserted with `search_vector` left `NULL`; the vector is built + here rather than on the insert path, keeping `to_tsvector` off the hot path of + high-volume log ingestion. Search is eventually consistent as a result, + typically catching up within a minute. + + Two database objects support this: `safe_to_tsvector(regconfig, text)`, which + builds the vector while tolerating NULL and oversized input, and a partial + index over `search_vector IS NULL`, which keeps locating pending rows cheap as + the table grows. Vectors use the `english_nostop` config to match the read + side (`Lightning.Invocation`), which queries with + `to_tsquery('english_nostop', ...)`. + + Each run drains pending rows newest-first, in batches up to a per-run budget + (batch size and max batches are configurable via `Lightning.Config`). A run + that exhausts its budget leaves backlog behind + and enqueues an immediate follow-up ("snowball"); otherwise the minute-ly cron + tick keeps pace. The worker runs on the dedicated `search_indexing` queue at + concurrency 1, so only one job executes at a time, and the cron tick and the + snowball carry distinct `trigger` args, so job uniqueness allows one of each to + queue but never a duplicate. + """ + + use Oban.Worker, + queue: :search_indexing, + priority: 1, + max_attempts: 10, + # Restrict uniqueness to queued states. Oban's defaults also dedup against + # :executing/:completed, so a running snowball would match itself and fail + # to enqueue its successor — breaking the chain after one hop. + unique: [period: 55, keys: [:trigger], states: [:available, :scheduled]] + + alias Lightning.Repo + + require Logger + + @drain_sql """ + WITH pending AS ( + SELECT id, run_id FROM log_lines + WHERE search_vector IS NULL + ORDER BY timestamp DESC + LIMIT $1 FOR UPDATE SKIP LOCKED + ) + UPDATE log_lines l + SET search_vector = safe_to_tsvector('public.english_nostop'::regconfig, l.message) + FROM pending p WHERE l.id = p.id AND l.run_id = p.run_id + """ + + @impl Oban.Worker + def perform(%Oban.Job{}) do + batch_size = Lightning.Config.log_lines_search_indexing_batch_size() + max_batches = Lightning.Config.log_lines_search_indexing_max_batches() + + {filled, budget_exhausted?} = drain(0, 0, batch_size, max_batches) + + Logger.info(fn -> + # coveralls-ignore-start + "LogLines.SearchVectorWorker filled #{filled} search_vector row(s)." + # coveralls-ignore-stop + end) + + if budget_exhausted? do + # Budget exhausted, so backlog likely remains: enqueue an immediate + # follow-up rather than waiting for the next cron tick. + Oban.insert(Lightning.Oban, __MODULE__.new(%{"trigger" => "snowball"})) + end + + {:ok, filled} + end + + # Drains up to max_batches batches, accumulating the number of rows filled. + # Returns {filled, budget_exhausted?}. Stops early when a batch fills fewer + # than batch_size rows (backlog drained). + defp drain(filled, batches, batch_size, max_batches) do + if batches >= max_batches do + {filled, true} + else + %{num_rows: num_rows} = Repo.query!(@drain_sql, [batch_size]) + + if num_rows < batch_size do + {filled + num_rows, false} + else + drain(filled + num_rows, batches + 1, batch_size, max_batches) + end + end + end +end diff --git a/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs b/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs new file mode 100644 index 00000000000..e6fef135deb --- /dev/null +++ b/priv/repo/migrations/20260530091125_add_safe_to_tsvector_function.exs @@ -0,0 +1,23 @@ +defmodule Lightning.Repo.Migrations.AddSafeToTsvectorFunction do + use Ecto.Migration + + def up do + # Deliberately not STRICT: a STRICT function returns NULL for a NULL doc, + # which would leave search_vector NULL forever and stuck in the pending + # index. COALESCE instead so the result is always a non-NULL tsvector. + execute(""" + CREATE OR REPLACE FUNCTION safe_to_tsvector(config regconfig, doc text) RETURNS tsvector + LANGUAGE plpgsql IMMUTABLE AS $$ + BEGIN + RETURN to_tsvector(config, COALESCE(doc, '')); + EXCEPTION WHEN program_limit_exceeded THEN + RETURN ''::tsvector; + END; + $$; + """) + end + + def down do + execute("DROP FUNCTION IF EXISTS safe_to_tsvector(regconfig, text);") + end +end diff --git a/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs b/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs new file mode 100644 index 00000000000..267a58328ed --- /dev/null +++ b/priv/repo/migrations/20260530091126_add_log_lines_pending_search_index.exs @@ -0,0 +1,64 @@ +defmodule Lightning.Repo.Migrations.AddLogLinesPendingSearchIndex do + use Ecto.Migration + + @disable_ddl_transaction true + @disable_migration_lock true + + @num_partitions 100 + + def up do + # Partial index on the parent (ONLY); attached per-partition below. + execute(""" + CREATE INDEX IF NOT EXISTS log_lines_pending_search_idx + ON ONLY log_lines (timestamp) + WHERE search_vector IS NULL + """) + + # Build each partition's index CONCURRENTLY (cannot be done on the + # partitioned parent in PG15), then attach them to the parent index. + manage_partitions(@num_partitions, &create_partition_index/2) + manage_partitions(@num_partitions, &attach_partition_index/2) + end + + def down do + # Dropping the parent index cascades to the attached partition indexes. + execute("DROP INDEX IF EXISTS log_lines_pending_search_idx") + end + + defp manage_partitions(num_partitions, manage_function) do + 1..num_partitions + |> Enum.each(&manage_function.(num_partitions, &1)) + end + + defp create_partition_index(_num_partitions, part_num) do + # A failed CREATE INDEX CONCURRENTLY leaves an INVALID index that IF NOT + # EXISTS would skip, so the ATTACH below would mark the parent INVALID too. + # Drop any invalid leftover first so a re-run rebuilds cleanly. + execute(""" + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 FROM pg_class c + JOIN pg_index i ON i.indexrelid = c.oid + WHERE c.relname = 'log_lines_#{part_num}_pending_search_idx' + AND NOT i.indisvalid + ) THEN + EXECUTE 'DROP INDEX log_lines_#{part_num}_pending_search_idx'; + END IF; + END $$; + """) + + execute(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS log_lines_#{part_num}_pending_search_idx + ON log_lines_#{part_num} (timestamp) + WHERE search_vector IS NULL + """) + end + + defp attach_partition_index(_num_partitions, part_num) do + execute(""" + ALTER INDEX log_lines_pending_search_idx + ATTACH PARTITION log_lines_#{part_num}_pending_search_idx + """) + end +end diff --git a/priv/repo/migrations/20260530091127_drop_log_lines_search_vector_trigger.exs b/priv/repo/migrations/20260530091127_drop_log_lines_search_vector_trigger.exs new file mode 100644 index 00000000000..95c8843a7a9 --- /dev/null +++ b/priv/repo/migrations/20260530091127_drop_log_lines_search_vector_trigger.exs @@ -0,0 +1,30 @@ +defmodule Lightning.Repo.Migrations.DropLogLinesSearchVectorTrigger do + use Ecto.Migration + + def up do + execute("SET lock_timeout = '5s'") + execute("DROP TRIGGER IF EXISTS set_search_vector ON log_lines") + execute("DROP FUNCTION IF EXISTS update_search_vector()") + end + + def down do + execute(""" + CREATE OR REPLACE FUNCTION public.update_search_vector() + RETURNS trigger + LANGUAGE plpgsql + AS $function$ + begin + UPDATE log_lines SET search_vector = to_tsvector('english_nostop', message) WHERE id = NEW.id; + RETURN NEW; + end + $function$ ; + """) + + execute(""" + CREATE TRIGGER set_search_vector + AFTER INSERT ON log_lines FOR EACH ROW + WHEN (NEW."search_vector" IS NULL) + EXECUTE PROCEDURE update_search_vector(); + """) + end +end diff --git a/test/lightning/invocation_test.exs b/test/lightning/invocation_test.exs index 730589a2066..2d278c1b22a 100644 --- a/test/lightning/invocation_test.exs +++ b/test/lightning/invocation_test.exs @@ -1599,6 +1599,8 @@ defmodule Lightning.InvocationTest do timestamp: Timex.now() ) + flush_log_search_index() + %{ project: project, dataclip: dataclip, @@ -1695,6 +1697,18 @@ defmodule Lightning.InvocationTest do test "search on logs does NOT return 'stem' matches... only exact matches", %{project: project} do + # Positive control: the log vector is populated, so an exact token matches. + # Without this, a regression that leaves search_vector NULL would make the + # negative assertions below pass vacuously. + assert [_found] = + Invocation.search_workorders( + project, + SearchParams.new(%{ + "search_term" => "playing", + "search_fields" => ["log"] + }) + ).entries + assert [] = Invocation.search_workorders( project, @@ -1943,6 +1957,8 @@ defmodule Lightning.InvocationTest do timestamp: Timex.now() ) + flush_log_search_index() + %{ project: project, workorder: workorder, diff --git a/test/lightning/log_lines/search_vector_worker_test.exs b/test/lightning/log_lines/search_vector_worker_test.exs new file mode 100644 index 00000000000..ee1062faf35 --- /dev/null +++ b/test/lightning/log_lines/search_vector_worker_test.exs @@ -0,0 +1,210 @@ +defmodule Lightning.LogLines.SearchVectorWorkerTest do + use Lightning.DataCase, async: false + + import Lightning.Factories + + alias Lightning.LogLines.SearchVectorWorker + alias Lightning.Repo + alias Lightning.Runs + + setup do + dataclip = insert(:dataclip) + %{triggers: [trigger]} = workflow = insert(:simple_workflow) + + %{runs: [run]} = + work_order_for(trigger, workflow: workflow, dataclip: dataclip) + |> insert() + + %{run: run} + end + + # Inserts via the public API, which leaves `search_vector` NULL — the pending + # state the worker drains. + defp append_log(run, message) do + {:ok, log_line} = + Runs.append_run_log(run, %{ + message: message, + timestamp: DateTime.utc_now() |> DateTime.to_unix(:millisecond) + }) + + log_line + end + + # Inserts a log line directly, bypassing the API, so we can stuff in an + # oversized message that would blow past the 1MB tsvector limit. `search_vector` + # is left NULL just like the regular insert path. + defp insert_raw_log(run, message) do + id = Ecto.UUID.generate() + + Repo.query!( + """ + INSERT INTO log_lines (id, run_id, message, timestamp) + VALUES ($1::uuid, $2::uuid, $3, now()) + """, + [ + Ecto.UUID.dump!(id), + Ecto.UUID.dump!(run.id), + message + ] + ) + + id + end + + defp search_vector_state(id) do + %{rows: [[is_null, matches]]} = + Repo.query!( + """ + SELECT search_vector IS NULL, + COALESCE(search_vector @@ to_tsquery('english_nostop', 'logline'), false) + FROM log_lines WHERE id = $1::uuid + """, + [Ecto.UUID.dump!(id)] + ) + + %{null?: is_null, matches?: matches} + end + + describe "perform/1" do + test "fills search_vector for pending log lines so they become searchable", + %{run: run} do + # Stay within the tiny test budget (batch_size: 2, max_batches: 2) so a + # single run drains everything: batch 1 fills 2, batch 2 fills 1 (< 2) and + # stops without tripping the budget. + ids = + for n <- 1..3 do + append_log(run, "logline number #{n} doing work").id + end + + for id <- ids do + assert %{null?: true, matches?: false} = search_vector_state(id) + end + + assert {:ok, 3} = perform_job(SearchVectorWorker, %{}) + + for id <- ids do + assert %{null?: false, matches?: true} = search_vector_state(id) + end + end + + test "an oversized message becomes an empty vector without rolling back the batch", + %{run: run} do + normal_id = append_log(run, "logline a normal searchable entry").id + + # 200k distinct words exceeds the 1MB tsvector limit; safe_to_tsvector + # swallows the program_limit_exceeded and returns ''::tsvector. + oversized = + 1..200_000 + |> Enum.map_join(" ", &"w#{&1}") + + oversized_id = insert_raw_log(run, oversized) + + assert {:ok, filled} = perform_job(SearchVectorWorker, %{}) + assert filled == 2 + + # The normal row in the same batch still got indexed. + assert %{null?: false, matches?: true} = search_vector_state(normal_id) + + # The oversized row is set to a non-NULL empty vector (so it leaves the + # pending set and is not retried forever) but matches nothing. + assert %{null?: false, matches?: false} = search_vector_state(oversized_id) + + %{rows: [[vector]]} = + Repo.query!( + "SELECT search_vector::text FROM log_lines WHERE id = $1::uuid", + [Ecto.UUID.dump!(oversized_id)] + ) + + assert vector == "" + end + + test "does not snowball when the per-run budget is not exhausted", %{ + run: run + } do + for n <- 1..3, do: append_log(run, "logline modest backlog #{n}") + + Oban.Testing.with_testing_mode(:manual, fn -> + assert {:ok, 3} = perform_job(SearchVectorWorker, %{}) + + refute_enqueued(worker: SearchVectorWorker) + end) + end + + test "snowballs an immediate follow-up when the per-run budget is exhausted", + %{run: run} do + # config/test.exs sets batch_size: 2, max_batches: 2. With 5 pending rows + # the run fills two full batches (4 rows), reaches max_batches, and trips + # the budget guard, leaving backlog behind and enqueuing a snowball. + for n <- 1..5, do: append_log(run, "logline overflowing backlog #{n}") + + Oban.Testing.with_testing_mode(:manual, fn -> + assert {:ok, 4} = perform_job(SearchVectorWorker, %{}) + + assert_enqueued( + worker: SearchVectorWorker, + args: %{"trigger" => "snowball"} + ) + end) + end + + test "is idempotent: a second run with nothing pending fills 0 and snowballs nothing", + %{run: run} do + for n <- 1..3, do: append_log(run, "logline #{n}") + + assert {:ok, 3} = perform_job(SearchVectorWorker, %{}) + + Oban.Testing.with_testing_mode(:manual, fn -> + assert {:ok, 0} = perform_job(SearchVectorWorker, %{}) + + refute_enqueued(worker: SearchVectorWorker) + end) + end + end + + describe "snowball uniqueness" do + # Guards the snowball chain: an executing job must be able to enqueue its + # successor. Oban's default unique states include :executing, so a snowball + # would otherwise match itself and the chain would die after one hop. + test "an executing snowball does not block enqueuing its successor" do + Oban.Testing.with_testing_mode(:manual, fn -> + {:ok, running} = + Oban.insert( + Lightning.Oban, + SearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + # Mimic Oban marking the job as executing while perform/1 runs. + from(j in Oban.Job, where: j.id == ^running.id) + |> Repo.update_all(set: [state: "executing"]) + + {:ok, successor} = + Oban.insert( + Lightning.Oban, + SearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + refute successor.conflict? + refute successor.id == running.id + end) + end + + test "two queued snowballs are deduped to one" do + Oban.Testing.with_testing_mode(:manual, fn -> + {:ok, first} = + Oban.insert( + Lightning.Oban, + SearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + {:ok, second} = + Oban.insert( + Lightning.Oban, + SearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + assert second.conflict? + assert second.id == first.id + end) + end + end +end diff --git a/test/lightning/runs_test.exs b/test/lightning/runs_test.exs index 6029d971855..b3a3477fd3b 100644 --- a/test/lightning/runs_test.exs +++ b/test/lightning/runs_test.exs @@ -13,6 +13,34 @@ defmodule Lightning.RunsTest do alias Lightning.WorkOrders alias Lightning.Workflows + # LogLine's Ecto schema has no search_vector field, so assert against the + # column via raw SQL. + defp search_vector_null?(id) do + %{rows: [[is_null]]} = + Lightning.Repo.query!( + "SELECT search_vector IS NULL FROM log_lines WHERE id = $1::uuid", + [Ecto.UUID.dump!(id)] + ) + + is_null + end + + defp log_line_searchable?(id, term) do + %{rows: [[matches]]} = + Lightning.Repo.query!( + """ + SELECT COALESCE( + search_vector @@ to_tsquery('english_nostop', $2), + false + ) + FROM log_lines WHERE id = $1::uuid + """, + [Ecto.UUID.dump!(id), term] + ) + + matches + end + describe "enqueue/1" do test "enqueues a run" do dataclip = insert(:dataclip) @@ -878,6 +906,61 @@ defmodule Lightning.RunsTest do assert log_line.message == ~s<{"foo":"bar"}> end + + test "leaves search_vector NULL at insert (deferred indexing)" do + dataclip = insert(:dataclip) + %{triggers: [trigger], jobs: [_job]} = workflow = insert(:simple_workflow) + + %{runs: [run]} = + work_order_for(trigger, workflow: workflow, dataclip: dataclip) + |> insert() + + {:ok, log_line} = + Runs.append_run_log(run, %{ + message: "a searchable logline message", + timestamp: DateTime.utc_now() |> DateTime.to_unix(:millisecond) + }) + + # search_vector is computed out-of-band by SearchVectorWorker, so it + # starts NULL and isn't matched by a full-text query yet. + assert search_vector_null?(log_line.id) + refute log_line_searchable?(log_line.id, "searchable") + end + end + + describe "append_run_logs_batch/3" do + test "inserts all lines, broadcasts log_appended, and defers search_vector" do + dataclip = insert(:dataclip) + %{triggers: [trigger]} = workflow = insert(:simple_workflow) + + %{runs: [run]} = + work_order_for(trigger, workflow: workflow, dataclip: dataclip) + |> insert() + + Runs.subscribe(run) + + now = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + entries = [ + %{message: "batch logline alpha", timestamp: now}, + %{message: "batch logline beta", timestamp: now + 1}, + %{message: "batch logline gamma", timestamp: now + 2} + ] + + {:ok, log_lines} = Runs.append_run_logs_batch(run, entries) + + assert length(log_lines) == 3 + assert Enum.map(log_lines, & &1.message) == Enum.map(entries, & &1.message) + + for _ <- entries do + assert_received %Runs.Events.LogAppended{} + end + + for log_line <- log_lines do + assert search_vector_null?(log_line.id) + refute log_line_searchable?(log_line.id, "logline") + end + end end describe "mark_unfinished_steps_lost/1" do diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex index 3f5c50af19a..0484517767f 100644 --- a/test/support/test_utils.ex +++ b/test/support/test_utils.ex @@ -1,6 +1,8 @@ defmodule Lightning.TestUtils do @moduledoc false + alias Lightning.LogLines.SearchVectorWorker + @doc """ Assert that the given context has the given keys, otherwise raise an error. @@ -19,6 +21,26 @@ defmodule Lightning.TestUtils do :ok end + @doc """ + Drain the deferred `log_lines.search_vector` backlog synchronously. + + Log lines are inserted with `search_vector` left NULL and indexed asynchronously + by `Lightning.LogLines.SearchVectorWorker`. In tests that insert log lines and then + query log search, call this after inserting (and before searching) so the vector is + populated within the SQL sandbox. + + Runs the worker in-process via `Oban.Testing.perform_job/3`, so it sees the + uncommitted sandbox rows. Returns the number of rows indexed; a no-op (`0`) when + nothing is pending, so it is safe to call unconditionally. + """ + @spec flush_log_search_index() :: non_neg_integer() + def flush_log_search_index do + {:ok, indexed} = + Oban.Testing.perform_job(SearchVectorWorker, %{}, repo: Lightning.Repo) + + indexed + end + @doc """ Merge the given setups into the given context. Just works a bit like `setup` on ExUnit.Case.