Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ and this project adheres to

### Fixed

- Reduce `run:log` channel timeouts under heavy log volume by moving `log_lines`
search indexing off the insert path. The full-text search vector is now
backfilled by a background worker rather than computed synchronously on every
insert, so log search is eventually-consistent (typically within a minute).
[#4425](https://github.com/OpenFn/lightning/issues/4425)
- Channel join crashes when multiple users open the same workflow concurrently
[#4802](https://github.com/OpenFn/lightning/issues/4802)
- Fix `purge_deleted` Oban job crashing when a soft-deleted project has
Expand Down
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ config :lightning, :is_resettable_demo, false
config :lightning, :default_retention_period, nil
config :lightning, :claim_work_mem, nil

config :lightning, :log_lines_search_indexing, batch_size: 2_500, max_batches: 10

config :lightning, Lightning.Runtime.RuntimeManager, start: false

config :lightning, LightningWeb.CollectionsController,
Expand Down
4 changes: 4 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ config :lightning, CLI, child_process_mod: FakeRambo

config :lightning, :is_resettable_demo, true

# Tiny budget so a handful of pending rows fills multiple full batches and trips
# the per-run budget guard, exercising the snowball follow-up path.
config :lightning, :log_lines_search_indexing, batch_size: 2, max_batches: 2

config :lightning, :github_app,
app_id: "111111",
app_name: "test-github",
Expand Down
24 changes: 24 additions & 0 deletions lib/lightning/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ defmodule Lightning.Config do
Application.get_env(:lightning, :activity_cleanup_chunk_size)
end

@impl true
def log_lines_search_indexing_batch_size do
log_lines_search_indexing_config() |> Keyword.fetch!(:batch_size)
end

@impl true
def log_lines_search_indexing_max_batches do
log_lines_search_indexing_config() |> Keyword.fetch!(:max_batches)
end

defp log_lines_search_indexing_config do
Application.get_env(:lightning, :log_lines_search_indexing, [])
end

@impl true
def default_ecto_database_timeout do
Application.get_env(:lightning, Lightning.Repo) |> Keyword.get(:timeout)
Expand Down Expand Up @@ -483,6 +497,8 @@ defmodule Lightning.Config do
@callback promex_enabled?() :: boolean()
@callback purge_deleted_after_days() :: integer()
@callback activity_cleanup_chunk_size() :: integer()
@callback log_lines_search_indexing_batch_size() :: pos_integer()
@callback log_lines_search_indexing_max_batches() :: pos_integer()
@callback default_ecto_database_timeout() :: integer()
@callback repo_connection_token_signer() :: Joken.Signer.t()
@callback reset_password_token_validity_in_days() :: integer()
Expand Down Expand Up @@ -597,6 +613,14 @@ defmodule Lightning.Config do
impl().activity_cleanup_chunk_size()
end

def log_lines_search_indexing_batch_size do
impl().log_lines_search_indexing_batch_size()
end

def log_lines_search_indexing_max_batches do
impl().log_lines_search_indexing_max_batches()
end

def default_ecto_database_timeout do
impl().default_ecto_database_timeout()
end
Expand Down
6 changes: 4 additions & 2 deletions lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ defmodule Lightning.Config.Bootstrap do
args: %{"type" => "monthly_project_digest"}},
# TODO - move this into an ENV?
{"17 */2 * * *", Lightning.Projects, args: %{"type" => "data_retention"}},
{"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker}
{"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker},
{"* * * * *", Lightning.LogLines.SearchVectorWorker}
]

cleanup_cron =
Expand Down Expand Up @@ -300,7 +301,8 @@ defmodule Lightning.Config.Bootstrap do
workflow_failures: 1,
background: 1,
history_exports: 1,
ai_assistant: 10
ai_assistant: 10,
search_indexing: 1
]

# https://plausible.io/ is an open-source, privacy-friendly alternative to
Expand Down
90 changes: 90 additions & 0 deletions lib/lightning/log_lines/search_vector_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
defmodule Lightning.LogLines.SearchVectorWorker do
@moduledoc """
Backfills the full-text `search_vector` on `log_lines` rows.

Log lines are inserted with `search_vector` left `NULL`; the vector is built
here rather than on the insert path, keeping `to_tsvector` off the hot path of
high-volume log ingestion. Search is eventually consistent as a result,
typically catching up within a minute.

Two database objects support this: `safe_to_tsvector(regconfig, text)`, which
builds the vector while tolerating NULL and oversized input, and a partial
index over `search_vector IS NULL`, which keeps locating pending rows cheap as
the table grows. Vectors use the `english_nostop` config to match the read
side (`Lightning.Invocation`), which queries with
`to_tsquery('english_nostop', ...)`.

Each run drains pending rows newest-first, in batches up to a per-run budget
(batch size and max batches are configurable via `Lightning.Config`). A run
that exhausts its budget leaves backlog behind
and enqueues an immediate follow-up ("snowball"); otherwise the minute-ly cron
tick keeps pace. The worker runs on the dedicated `search_indexing` queue at
concurrency 1, so only one job executes at a time, and the cron tick and the
snowball carry distinct `trigger` args, so job uniqueness allows one of each to
queue but never a duplicate.
"""

use Oban.Worker,
queue: :search_indexing,
priority: 1,
max_attempts: 10,
# Restrict uniqueness to queued states. Oban's defaults also dedup against
# :executing/:completed, so a running snowball would match itself and fail
# to enqueue its successor — breaking the chain after one hop.
unique: [period: 55, keys: [:trigger], states: [:available, :scheduled]]

alias Lightning.Repo

require Logger

@drain_sql """
WITH pending AS (
SELECT id, run_id FROM log_lines
WHERE search_vector IS NULL
ORDER BY timestamp DESC
LIMIT $1 FOR UPDATE SKIP LOCKED
)
UPDATE log_lines l
SET search_vector = safe_to_tsvector('public.english_nostop'::regconfig, l.message)
FROM pending p WHERE l.id = p.id AND l.run_id = p.run_id
"""

@impl Oban.Worker
def perform(%Oban.Job{}) do
batch_size = Lightning.Config.log_lines_search_indexing_batch_size()
max_batches = Lightning.Config.log_lines_search_indexing_max_batches()

{filled, budget_exhausted?} = drain(0, 0, batch_size, max_batches)

Logger.info(fn ->
# coveralls-ignore-start
"LogLines.SearchVectorWorker filled #{filled} search_vector row(s)."
# coveralls-ignore-stop
end)

if budget_exhausted? do
# Budget exhausted, so backlog likely remains: enqueue an immediate
# follow-up rather than waiting for the next cron tick.
Oban.insert(Lightning.Oban, __MODULE__.new(%{"trigger" => "snowball"}))
end

{:ok, filled}
end

# Drains up to max_batches batches, accumulating the number of rows filled.
# Returns {filled, budget_exhausted?}. Stops early when a batch fills fewer
# than batch_size rows (backlog drained).
defp drain(filled, batches, batch_size, max_batches) do
if batches >= max_batches do
{filled, true}
else
%{num_rows: num_rows} = Repo.query!(@drain_sql, [batch_size])

if num_rows < batch_size do
{filled + num_rows, false}
else
drain(filled + num_rows, batches + 1, batch_size, max_batches)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Lightning.Repo.Migrations.AddSafeToTsvectorFunction do
use Ecto.Migration

def up do
# Deliberately not STRICT: a STRICT function returns NULL for a NULL doc,
# which would leave search_vector NULL forever and stuck in the pending
# index. COALESCE instead so the result is always a non-NULL tsvector.
execute("""
CREATE OR REPLACE FUNCTION safe_to_tsvector(config regconfig, doc text) RETURNS tsvector
LANGUAGE plpgsql IMMUTABLE AS $$
BEGIN
RETURN to_tsvector(config, COALESCE(doc, ''));
EXCEPTION WHEN program_limit_exceeded THEN
RETURN ''::tsvector;
END;
$$;
""")
end

def down do
execute("DROP FUNCTION IF EXISTS safe_to_tsvector(regconfig, text);")
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Lightning.Repo.Migrations.AddLogLinesPendingSearchIndex do
use Ecto.Migration

@disable_ddl_transaction true
@disable_migration_lock true

@num_partitions 100

def up do
# Partial index on the parent (ONLY); attached per-partition below.
execute("""
CREATE INDEX IF NOT EXISTS log_lines_pending_search_idx
ON ONLY log_lines (timestamp)
WHERE search_vector IS NULL
""")

# Build each partition's index CONCURRENTLY (cannot be done on the
# partitioned parent in PG15), then attach them to the parent index.
manage_partitions(@num_partitions, &create_partition_index/2)
manage_partitions(@num_partitions, &attach_partition_index/2)
end

def down do
# Dropping the parent index cascades to the attached partition indexes.
execute("DROP INDEX IF EXISTS log_lines_pending_search_idx")
end

defp manage_partitions(num_partitions, manage_function) do
1..num_partitions
|> Enum.each(&manage_function.(num_partitions, &1))
end

defp create_partition_index(_num_partitions, part_num) do
# A failed CREATE INDEX CONCURRENTLY leaves an INVALID index that IF NOT
# EXISTS would skip, so the ATTACH below would mark the parent INVALID too.
# Drop any invalid leftover first so a re-run rebuilds cleanly.
execute("""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM pg_class c
JOIN pg_index i ON i.indexrelid = c.oid
WHERE c.relname = 'log_lines_#{part_num}_pending_search_idx'
AND NOT i.indisvalid
) THEN
EXECUTE 'DROP INDEX log_lines_#{part_num}_pending_search_idx';
END IF;
END $$;
""")

execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS log_lines_#{part_num}_pending_search_idx
ON log_lines_#{part_num} (timestamp)
WHERE search_vector IS NULL
""")
end

defp attach_partition_index(_num_partitions, part_num) do
execute("""
ALTER INDEX log_lines_pending_search_idx
ATTACH PARTITION log_lines_#{part_num}_pending_search_idx
""")
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Lightning.Repo.Migrations.DropLogLinesSearchVectorTrigger do
use Ecto.Migration

def up do
execute("SET lock_timeout = '5s'")
execute("DROP TRIGGER IF EXISTS set_search_vector ON log_lines")
execute("DROP FUNCTION IF EXISTS update_search_vector()")
end

def down do
execute("""
CREATE OR REPLACE FUNCTION public.update_search_vector()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
begin
UPDATE log_lines SET search_vector = to_tsvector('english_nostop', message) WHERE id = NEW.id;
RETURN NEW;
end
$function$ ;
""")

execute("""
CREATE TRIGGER set_search_vector
AFTER INSERT ON log_lines FOR EACH ROW
WHEN (NEW."search_vector" IS NULL)
EXECUTE PROCEDURE update_search_vector();
""")
end
end
16 changes: 16 additions & 0 deletions test/lightning/invocation_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1599,6 +1599,8 @@ defmodule Lightning.InvocationTest do
timestamp: Timex.now()
)

flush_log_search_index()

%{
project: project,
dataclip: dataclip,
Expand Down Expand Up @@ -1695,6 +1697,18 @@ defmodule Lightning.InvocationTest do

test "search on logs does NOT return 'stem' matches... only exact matches",
%{project: project} do
# Positive control: the log vector is populated, so an exact token matches.
# Without this, a regression that leaves search_vector NULL would make the
# negative assertions below pass vacuously.
assert [_found] =
Invocation.search_workorders(
project,
SearchParams.new(%{
"search_term" => "playing",
"search_fields" => ["log"]
})
).entries

assert [] =
Invocation.search_workorders(
project,
Expand Down Expand Up @@ -1943,6 +1957,8 @@ defmodule Lightning.InvocationTest do
timestamp: Timex.now()
)

flush_log_search_index()

%{
project: project,
workorder: workorder,
Expand Down
Loading