diff --git a/.changeset/ets-inspector-mailbox-overload.md b/.changeset/ets-inspector-mailbox-overload.md new file mode 100644 index 0000000000..2edcc2c8ec --- /dev/null +++ b/.changeset/ets-inspector-mailbox-overload.md @@ -0,0 +1,10 @@ +--- +'@core/sync-service': patch +--- + +Stop the EtsInspector from overloading its mailbox and re-running database lookups during cold-start bursts or while Postgres is degraded (#4370): + +- Concurrent lookups of the same relation, oid, or feature now share a single database call instead of each running their own. +- Failed lookups (table-not-found and connection errors) are cached briefly, so a burst against a failing key stops hammering the database. +- Each lookup has an explicit transaction timeout. +- Each lookup is recorded as an `inspector.fetch_db` span so its latency and outcome are visible in telemetry. diff --git a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex index 3632dc6f15..1adf3262de 100644 --- a/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex +++ b/packages/sync-service/lib/electric/postgres/inspector/ets_inspector.ex @@ -13,7 +13,31 @@ defmodule Electric.Postgres.Inspector.EtsInspector do import Electric, only: :macros require Logger + # Inspector lookups run on the shared metadata pool, which holds at most 4 + # connections (see `Electric.Connection.Manager.pool_sizes/1`). With request + # coalescing a single in-flight key holds one of those connections for the + # whole lookup, so a handful of distinct cold-cache keys hitting a degraded + # Postgres can pin the entire metadata pool — starving the connection + # manager's own admin queries that share it. Cap each lookup well below + # Postgrex's inherited 15s default so a slow connection is returned to the + # pool promptly and the failure gets negative-cached instead of held open. + # Healthy catalog reads are milliseconds, so 5s is ample headroom; this is + # deliberately a pool-protection bound, not the caller's request budget (a + # shape request tolerates its full long-poll timeout, 20-60s). + @fetch_db_timeout 5_000 + + # How long terminal negative results (table-not-found, connection errors) are + # cached so a burst against a failing key drains the mailbox instead of + # refilling it at the same rate. Short, because a connection error may clear. + @default_negative_cache_ttl_ms 1_000 + + # How often expired negative-cache entries are physically reclaimed from the + # table. They are bounded by this interval rather than living forever, even + # though the negative key space is client-controlled (distinct table names). + @negative_cache_sweep_interval_ms 60_000 + alias Electric.Postgres.Inspector + alias Electric.Telemetry.OpenTelemetry alias Electric.PersistentKV alias Electric.Postgres.Inspector.DirectInspector @@ -24,6 +48,17 @@ defmodule Electric.Postgres.Inspector.EtsInspector do Electric.ProcessRegistry.name(stack_ref, __MODULE__) end + @doc """ + Name of the `Task.Supervisor` that runs the inspector's DB-lookup workers. + + Declared as a sibling child of the inspector in `Electric.StackSupervisor` + rather than started ad-hoc from `init/1`, so the process hierarchy stays + visible in the supervision tree. + """ + def task_supervisor_name(stack_ref) do + Electric.ProcessRegistry.name(stack_ref, __MODULE__.TaskSupervisor) + end + def start_link(opts) do {:ok, pid} = GenServer.start_link( @@ -42,8 +77,9 @@ defmodule Electric.Postgres.Inspector.EtsInspector do @spec load_relation_oid(Electric.relation(), opts :: term()) :: {:ok, Electric.oid_relation()} | :table_not_found | {:error, term()} def load_relation_oid(relation, opts) when is_relation(relation) do - with :not_in_cache <- fetch_normalized_relation_from_ets(relation, opts) do - GenServer.call(opts[:server], {:load_relation_oid, relation}, :infinity) + with :not_in_cache <- fetch_normalized_relation_from_ets(relation, opts), + :not_in_cache <- fetch_negative_cache({:rel, relation}, opts) do + GenServer.call(opts[:server], {:load, {:rel, relation}, :relation_oid}, :infinity) end end @@ -51,8 +87,9 @@ defmodule Electric.Postgres.Inspector.EtsInspector do @spec load_relation_info(Electric.relation_id(), opts :: term()) :: {:ok, Inspector.relation_info()} | :table_not_found | {:error, term()} def load_relation_info(oid, opts) when is_relation_id(oid) do - with :not_in_cache <- fetch_relation_info_from_ets(oid, opts) do - GenServer.call(opts[:server], {:load_relation_info, oid}, :infinity) + with :not_in_cache <- fetch_relation_info_from_ets(oid, opts), + :not_in_cache <- fetch_negative_cache({:oid, oid}, opts) do + GenServer.call(opts[:server], {:load, {:oid, oid}, :relation_info}, :infinity) end end @@ -60,8 +97,9 @@ defmodule Electric.Postgres.Inspector.EtsInspector do @spec load_column_info(Electric.relation_id(), opts :: term()) :: {:ok, [Inspector.column_info()]} | :table_not_found | {:error, term()} def load_column_info(oid, opts) when is_relation_id(oid) do - with :not_in_cache <- fetch_column_info_from_ets(oid, opts) do - GenServer.call(opts[:server], {:load_column_info, oid}, :infinity) + with :not_in_cache <- fetch_column_info_from_ets(oid, opts), + :not_in_cache <- fetch_negative_cache({:oid, oid}, opts) do + GenServer.call(opts[:server], {:load, {:oid, oid}, :column_info}, :infinity) end end @@ -69,8 +107,9 @@ defmodule Electric.Postgres.Inspector.EtsInspector do @spec load_supported_features(opts :: term()) :: {:ok, Map.t()} | {:error, String.t() | :connection_not_available} def load_supported_features(opts) do - with :not_in_cache <- fetch_supported_features_from_ets(opts) do - GenServer.call(opts[:server], :load_supported_features, :infinity) + with :not_in_cache <- fetch_supported_features_from_ets(opts), + :not_in_cache <- fetch_negative_cache(:supported_features, opts) do + GenServer.call(opts[:server], {:load, :supported_features, :supported_features}, :infinity) end end @@ -113,58 +152,30 @@ defmodule Electric.Postgres.Inspector.EtsInspector do state = %{ + stack_id: opts.stack_id, pg_inspector_table: pg_inspector_table, pg_pool: opts.pool, persistent_kv: opts.persistent_kv, - persistence_key: persistence_key + persistence_key: persistence_key, + task_sup: task_supervisor_name(opts.stack_id), + in_flight: %{}, + in_flight_refs: %{}, + negative_cache_ttl_ms: + Map.get(opts, :negative_cache_ttl_ms, @default_negative_cache_ttl_ms) } |> restore_persistent_state() + schedule_negative_cache_sweep() + {:ok, state} end @impl GenServer - def handle_call({:load_relation_oid, rel}, _from, state) do - response = - with :not_in_cache <- fetch_normalized_relation_from_ets(rel, state), - :ok <- fill_cache(rel, state) do - fetch_normalized_relation_from_ets(rel, state) - end - - {:reply, response, state} - end - - def handle_call({:load_relation_info, oid}, _from, state) do - response = - with :not_in_cache <- fetch_relation_info_from_ets(oid, state), - :ok <- fill_cache(oid, state) do - fetch_relation_info_from_ets(oid, state) - end - - {:reply, response, state} - end - - def handle_call({:load_column_info, oid}, _from, state) do - response = - with :not_in_cache <- fetch_column_info_from_ets(oid, state), - :ok <- fill_cache(oid, state) do - fetch_column_info_from_ets(oid, state) - end - - {:reply, response, state} - end - - def handle_call(:load_supported_features, _from, state) do - response = - with :not_in_cache <- fetch_supported_features_from_ets(state), - {:ok, features} <- - wrap_in_db_errors(fn -> DirectInspector.load_supported_features(state.pg_pool) end) do - store_supported_features(state, features) - persist_data(state) - {:ok, features} - end - - {:reply, response, state} + def handle_call({:load, key, reader}, from, state) do + case read_cached(key, reader, state) do + :not_in_cache -> {:noreply, enqueue_waiter(state, key, {from, reader})} + response -> {:reply, response, state} + end end def handle_call({:clean, oid}, _from, state) do @@ -218,42 +229,205 @@ defmodule Electric.Postgres.Inspector.EtsInspector do end @impl GenServer + def handle_info({ref, result}, state) when is_reference(ref) do + Process.demonitor(ref, [:flush]) + + case pop_in_flight_by_ref(state, ref) do + {nil, state} -> + {:noreply, state} + + {{key, entry}, state} -> + state = apply_fill_result(state, key, result) + reply_waiters(state, key, result, entry.waiters) + {:noreply, state} + end + end + + def handle_info({:DOWN, ref, :process, _pid, reason}, state) do + case pop_in_flight_by_ref(state, ref) do + {nil, state} -> + {:noreply, state} + + {{key, entry}, state} -> + Logger.warning( + "EtsInspector fill worker for #{inspect(key)} exited before replying: #{inspect(reason)}" + ) + + for {from, _reader} <- entry.waiters do + GenServer.reply(from, {:error, :connection_not_available}) + end + + {:noreply, state} + end + end + + def handle_info(:sweep_negative_cache, state) do + # Negative entries are logically ignored once expired, but the table is + # `:protected` so only we can physically reclaim them. Without this sweep a + # client requesting many distinct non-existent tables would grow the table + # unboundedly. Reschedule unconditionally so the timer keeps running. + now = System.monotonic_time(:millisecond) + + :ets.select_delete(inspector_table(state), [ + {{{:negative, :_}, :_, :"$1"}, [{:<, :"$1", now}], [true]} + ]) + + schedule_negative_cache_sweep() + {:noreply, state} + end + def handle_info({:EXIT, _, reason}, state) do {:stop, reason, state} end - defp fill_cache(rel_or_oid, state) when is_relation(rel_or_oid) or is_relation_id(rel_or_oid) do - case fetch_from_db(rel_or_oid, state) do - {:ok, {rel, cols}} -> - state - |> store_relation_info(rel, cols) - |> persist_data() + defp schedule_negative_cache_sweep do + Process.send_after(self(), :sweep_negative_cache, @negative_cache_sweep_interval_ms) + end + + # Coalesce concurrent loads of the same key: the first waiter spawns one + # supervised worker to do the DB lookup; later waiters for the same key just + # park their `from` and are all answered when the worker reports back. This + # keeps the inspector's mailbox population proportional to unique in-flight + # keys rather than to in-flight requests, and avoids head-of-line blocking on + # a single slow lookup. `in_flight_refs` maps the worker's monitor ref back to + # its key so completion/crash handling is O(1). + defp enqueue_waiter(state, key, waiter) do + case Map.fetch(state.in_flight, key) do + {:ok, entry} -> + entry = %{entry | waiters: [waiter | entry.waiters]} + %{state | in_flight: Map.put(state.in_flight, key, entry)} + + :error -> + %{ref: ref} = + Task.Supervisor.async_nolink(state.task_sup, fn -> + fetch_for_key(key, state.pg_pool, state.stack_id) + end) + + entry = %{waiters: [waiter], ref: ref} + + %{ + state + | in_flight: Map.put(state.in_flight, key, entry), + in_flight_refs: Map.put(state.in_flight_refs, ref, key) + } + end + end + + defp pop_in_flight_by_ref(state, ref) do + case Map.pop(state.in_flight_refs, ref) do + {nil, _refs} -> + {nil, state} - {:ok, :table_not_found} -> - :table_not_found + {key, in_flight_refs} -> + {entry, in_flight} = Map.pop(state.in_flight, key) + {{key, entry}, %{state | in_flight: in_flight, in_flight_refs: in_flight_refs}} + end + end + + # A cache miss runs the DB lookup in a detached worker, so it's outside the + # request's trace; and one lookup serves every coalesced waiter, so it can't + # belong to a single request anyway. Record it as a standalone span to expose + # catalog-query latency and outcome in prod (there is no other signal isolating + # these queries). + defp fetch_for_key(key, pool, stack_id) do + OpenTelemetry.with_span( + "inspector.fetch_db", + [{"inspector.key_type", key_type(key)}], + stack_id, + fn -> + result = do_fetch_for_key(key, pool) + OpenTelemetry.add_span_attributes(%{"inspector.result" => fetch_outcome(result)}) + result + end + ) + end + + defp do_fetch_for_key({:rel, rel}, pool), do: fetch_from_db(rel, pool) + defp do_fetch_for_key({:oid, oid}, pool), do: fetch_from_db(oid, pool) + + defp do_fetch_for_key(:supported_features, pool) do + wrap_in_db_errors(fn -> DirectInspector.load_supported_features(pool) end) + end + + defp key_type({:rel, _}), do: "relation" + defp key_type({:oid, _}), do: "oid" + defp key_type(:supported_features), do: "supported_features" + + defp fetch_outcome({:ok, :table_not_found}), do: "table_not_found" + defp fetch_outcome({:ok, _}), do: "ok" + defp fetch_outcome({:error, _}), do: "error" + + defp apply_fill_result(state, :supported_features, {:ok, features}) do + state |> store_supported_features(features) |> persist_data() + state + end + + defp apply_fill_result(state, _key, {:ok, {rel, cols}}) do + state |> store_relation_info(rel, cols) |> persist_data() + state + end + + # Terminal negative results (table-not-found / DB error) are cached for a short + # TTL so a burst against a failing key drains the mailbox instead of refilling + # it; the client reads this cache and short-circuits before messaging us again. + defp apply_fill_result(state, key, {:ok, :table_not_found}), + do: put_negative_cache(state, key, :table_not_found) + + defp apply_fill_result(state, key, {:error, reason}), + do: put_negative_cache(state, key, {:error, reason}) + + defp reply_waiters(state, key, result, waiters) do + for {from, reader} <- waiters do + GenServer.reply(from, waiter_response(state, key, reader, result)) + end + + :ok + end - {:error, err} -> - {:error, err} + defp waiter_response(_state, _key, _reader, {:ok, :table_not_found}), do: :table_not_found + defp waiter_response(_state, _key, _reader, {:error, reason}), do: {:error, reason} + defp waiter_response(state, key, reader, {:ok, _payload}), do: read_from_ets(key, reader, state) + + defp read_from_ets({:rel, rel}, :relation_oid, state), + do: fetch_normalized_relation_from_ets(rel, state) + + defp read_from_ets({:oid, oid}, :relation_info, state), + do: fetch_relation_info_from_ets(oid, state) + + defp read_from_ets({:oid, oid}, :column_info, state), + do: fetch_column_info_from_ets(oid, state) + + defp read_from_ets(:supported_features, :supported_features, state), + do: fetch_supported_features_from_ets(state) + + defp read_cached(key, reader, state) do + case read_from_ets(key, reader, state) do + :not_in_cache -> fetch_negative_cache(key, state) + response -> response end end - defp fetch_from_db(rel_or_oid, state) + defp fetch_from_db(rel_or_oid, pool) when is_relation(rel_or_oid) or is_relation_id(rel_or_oid) do wrap_in_db_errors(fn -> - Postgrex.transaction(state.pg_pool, fn conn -> - loader_fn = - if is_relation(rel_or_oid), - do: &DirectInspector.normalize_and_load_relation_info/2, - else: &DirectInspector.load_relation_info/2 - - with {:ok, rel} <- loader_fn.(rel_or_oid, conn), - {:ok, cols} <- DirectInspector.load_column_info(rel.relation_id, conn) do - {rel, cols} - else - {:error, err} -> Postgrex.rollback(conn, err) - :table_not_found -> :table_not_found - end - end) + Postgrex.transaction( + pool, + fn conn -> + loader_fn = + if is_relation(rel_or_oid), + do: &DirectInspector.normalize_and_load_relation_info/2, + else: &DirectInspector.load_relation_info/2 + + with {:ok, rel} <- loader_fn.(rel_or_oid, conn), + {:ok, cols} <- DirectInspector.load_column_info(rel.relation_id, conn) do + {rel, cols} + else + {:error, err} -> Postgrex.rollback(conn, err) + :table_not_found -> :table_not_found + end + end, + timeout: @fetch_db_timeout + ) end) end @@ -282,10 +456,38 @@ defmodule Electric.Postgres.Inspector.EtsInspector do @spec persist_data(map()) :: :ok defp persist_data(state) do - inspector_data = :ets.tab2list(state.pg_inspector_table) + # Negative-cache entries are transient and their `expires_at` is a monotonic + # timestamp that is meaningless across restarts, so they are never persisted. + inspector_data = + state.pg_inspector_table + |> :ets.tab2list() + |> Enum.reject(&match?({{:negative, _}, _, _}, &1)) + PersistentKV.set(state.persistent_kv, state.persistence_key, version: 1, data: inspector_data) end + defp negative_cache_key(key), do: {:negative, key} + + defp put_negative_cache(state, key, result) do + expires_at = System.monotonic_time(:millisecond) + state.negative_cache_ttl_ms + :ets.insert(inspector_table(state), {negative_cache_key(key), result, expires_at}) + state + end + + @spec fetch_negative_cache(term(), opts :: term()) :: + :table_not_found | {:error, term()} | :not_in_cache + defp fetch_negative_cache(key, opts) do + case :ets.lookup(inspector_table(opts), negative_cache_key(key)) do + [{_key, result, expires_at}] -> + if System.monotonic_time(:millisecond) < expires_at, do: result, else: :not_in_cache + + [] -> + :not_in_cache + end + rescue + ArgumentError -> :not_in_cache + end + @spec restore_persistent_state(map()) :: map() defp restore_persistent_state(state) do case PersistentKV.get(state.persistent_kv, state.persistence_key) do @@ -345,6 +547,13 @@ defmodule Electric.Postgres.Inspector.EtsInspector do @spec delete_relation_info(map(), Electric.relation_id()) :: map() defp delete_relation_info(state, oid) when is_relation_id(oid) do + # `clean` is the cache-invalidation primitive the schema reconciler calls when + # a relation diverges, so it must also drop any cached negative result for the + # oid — otherwise the next lookup would short-circuit on a stale `:table_not_found` + # / error for up to the negative-cache TTL. (Only the oid-keyed negative entry is + # addressable here; a relation-keyed one expires on its own short TTL.) + :ets.delete(inspector_table(state), negative_cache_key({:oid, oid})) + case fetch_relation_info_from_ets(oid, state) do :not_in_cache -> state diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index b6d0111669..fc40974215 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -416,6 +416,8 @@ defmodule Electric.StackSupervisor do {Registry, name: shape_changes_registry_name, keys: :duplicate, partitions: registry_partitions}, Electric.ShapeCache.Storage.stack_child_spec(storage), + {Task.Supervisor, + name: Electric.Postgres.Inspector.EtsInspector.task_supervisor_name(stack_id)}, {Electric.Postgres.Inspector.EtsInspector, stack_id: stack_id, pool: metadata_db_pool, persistent_kv: config.persistent_kv}, {Electric.MonitoredCoreSupervisor, diff --git a/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs b/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs index 17e87f102e..a93b1e2f5b 100644 --- a/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs +++ b/packages/sync-service/test/electric/postgres/inspector/ets_inspector_test.exs @@ -22,6 +22,22 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do assert is_integer(oid) end + test "bounds the DB transaction with an explicit timeout", %{opts: opts, db_conn: pool} do + test_pid = self() + + Repatch.patch(Postgrex, :transaction, [mode: :shared], fn p, fun, db_opts -> + if p == pool, do: send(test_pid, {:db_timeout, Keyword.get(db_opts, :timeout)}) + Repatch.real(Postgrex.transaction(p, fun, db_opts)) + end) + + allow_fill_workers(opts[:server]) + + assert {:ok, {_oid, {"public", "items"}}} = + EtsInspector.load_relation_oid({"public", "items"}, opts) + + assert_receive {:db_timeout, 5_000} + end + test "caches the relation id for a given relation once accesses", %{opts: opts} do assert {:ok, {oid, {"public", "items"}}} = EtsInspector.load_relation_oid({"public", "items"}, opts) @@ -32,7 +48,7 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do raise "should not be called again" end) - Repatch.allow(self(), opts[:server]) + allow_fill_workers(opts[:server]) assert {:ok, {^oid, {"public", "items"}}} = EtsInspector.load_relation_oid({"public", "items"}, opts) @@ -51,11 +67,30 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do {:error, %DBConnection.ConnectionError{message: "expected error"}} end) - Repatch.allow(self(), opts[:server]) + allow_fill_workers(opts[:server]) assert {:error, "expected error"} = EtsInspector.load_relation_oid({"public", "items"}, opts) end + + test "caches a not-found result so repeat lookups skip the DB", %{opts: opts, db_conn: pool} do + test_pid = self() + + Repatch.patch(Postgrex, :transaction, [mode: :shared], fn p, fun, db_opts -> + if p == pool, do: send(test_pid, :db_transaction) + Repatch.real(Postgrex.transaction(p, fun, db_opts)) + end) + + allow_fill_workers(opts[:server]) + + assert :table_not_found = EtsInspector.load_relation_oid({"public", "ghost"}, opts) + assert_receive :db_transaction + + # Second lookup within the TTL must be served from the negative cache in the + # client process, without messaging the GenServer or hitting the DB. + assert :table_not_found = EtsInspector.load_relation_oid({"public", "ghost"}, opts) + refute_receive :db_transaction + end end describe "load_relation_info/2" do @@ -74,25 +109,66 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do test "concurrent calls load value exactly once", %{ opts: opts, - items_oid: items_oid + items_oid: items_oid, + db_conn: pool } do - Repatch.spy(Postgrex) - Repatch.allow(self(), opts[:server]) + test_pid = self() + + Repatch.patch(Postgrex, :transaction, [mode: :shared], fn p, fun, db_opts -> + if p == pool, do: send(test_pid, :db_transaction) + Repatch.real(Postgrex.transaction(p, fun, db_opts)) + end) + + # Required so the shared patch reaches the supervised fill worker (its + # `$callers` include the GenServer, which we allow here). + allow_fill_workers(opts[:server]) task1 = Task.async(fn -> EtsInspector.load_relation_info(items_oid, opts) end) task2 = Task.async(fn -> EtsInspector.load_relation_info(items_oid, opts) end) assert {:ok, %{relation: {"public", "items"}, relation_id: ^items_oid, kind: :ordinary_table} = - info} = - Task.await(task1) + info} = Task.await(task1) assert {:ok, ^info} = Task.await(task2) # Non-parallel call should return value from cache assert {:ok, ^info} = EtsInspector.load_relation_info(items_oid, opts) - assert Repatch.called?(Postgrex, :transaction, 2, by: opts[:server], exactly: 1) + assert_receive :db_transaction + refute_receive :db_transaction + end + + test "concurrent calls for a missing relation produce a single DB attempt", + %{opts: opts, db_conn: pool} do + test_pid = self() + missing_oid = 1_234_567_890 + + # Hold the first (and only) worker inside the DB call until we've confirmed + # no second worker started. This makes coalescing deterministic: while the + # one worker is parked, every other concurrent request must coalesce onto + # its in-flight entry rather than spawn a fresh DB attempt. + Repatch.patch(Postgrex, :transaction, [mode: :shared], fn p, fun, db_opts -> + if p == pool do + send(test_pid, {:db_transaction, self()}) + receive do: (:proceed -> :ok) + end + + Repatch.real(Postgrex.transaction(p, fun, db_opts)) + end) + + allow_fill_workers(opts[:server]) + + tasks = + for _ <- 1..10 do + Task.async(fn -> EtsInspector.load_relation_info(missing_oid, opts) end) + end + + assert_receive {:db_transaction, worker} + refute_receive {:db_transaction, _}, 200 + + send(worker, :proceed) + assert Enum.all?(Task.await_many(tasks), &(&1 == :table_not_found)) end test "returns a not found marker when the relation does not exist", %{opts: opts} do @@ -104,7 +180,7 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do {:error, %DBConnection.ConnectionError{message: "expected error"}} end) - Repatch.allow(self(), opts[:server]) + allow_fill_workers(opts[:server]) assert {:error, "expected error"} = EtsInspector.load_relation_info(1_234_567_890, opts) @@ -207,6 +283,29 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do assert :ets.lookup(pg_inspector_table, {:oid_info, oid1}) == [] refute :ets.lookup(pg_inspector_table, {:oid_info, oid2}) == [] end + + test "clears a cached negative result for the oid", %{opts: opts, db_conn: pool} do + test_pid = self() + missing_oid = 1_234_567_890 + + Repatch.patch(Postgrex, :transaction, [mode: :shared], fn p, fun, db_opts -> + if p == pool, do: send(test_pid, :db_transaction) + Repatch.real(Postgrex.transaction(p, fun, db_opts)) + end) + + allow_fill_workers(opts[:server]) + + # Cache a negative result for the missing oid. + assert :table_not_found = EtsInspector.load_relation_info(missing_oid, opts) + assert_receive :db_transaction + + # `clean` must invalidate the negative cache, so the next lookup hits the DB + # again rather than short-circuiting on the stale `:table_not_found`. + assert EtsInspector.clean(missing_oid, opts) + + assert :table_not_found = EtsInspector.load_relation_info(missing_oid, opts) + assert_receive :db_transaction + end end describe "load_column_info/2" do @@ -224,22 +323,29 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do test "concurrent calls load value exactly once", %{ opts: opts, - items_oid: items_oid + items_oid: items_oid, + db_conn: pool } do - Repatch.spy(Postgrex) - Repatch.allow(self(), opts[:server]) + test_pid = self() + + Repatch.patch(Postgrex, :transaction, [mode: :shared], fn p, fun, db_opts -> + if p == pool, do: send(test_pid, :db_transaction) + Repatch.real(Postgrex.transaction(p, fun, db_opts)) + end) + + allow_fill_workers(opts[:server]) task1 = Task.async(fn -> EtsInspector.load_column_info(items_oid, opts) end) task2 = Task.async(fn -> EtsInspector.load_column_info(items_oid, opts) end) assert {:ok, [%{name: "id"}, %{name: "value"}] = columns} = Task.await(task1) - assert {:ok, ^columns} = Task.await(task2) # Non-parallel call should return value from cache assert {:ok, ^columns} = EtsInspector.load_column_info(items_oid, opts) - assert Repatch.called?(Postgrex, :transaction, 2, by: opts[:server], exactly: 1) + assert_receive :db_transaction + refute_receive :db_transaction end test "returns a not found marker when the relation does not exist", %{opts: opts} do @@ -251,7 +357,7 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do {:error, %DBConnection.ConnectionError{message: "expected error"}} end) - Repatch.allow(self(), opts[:server]) + allow_fill_workers(opts[:server]) assert {:error, "expected error"} = EtsInspector.load_column_info(1_234_567_890, opts) @@ -326,21 +432,27 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do end end - test "concurrent calls load value exactly once", %{opts: opts} do - Repatch.spy(Postgrex) - Repatch.allow(self(), opts[:server]) + test "concurrent calls load value exactly once", %{opts: opts, db_conn: pool} do + test_pid = self() + + Repatch.patch(Postgrex, :query, [mode: :shared], fn p, sql, params -> + if p == pool, do: send(test_pid, :db_query) + Repatch.real(Postgrex.query(p, sql, params)) + end) + + allow_fill_workers(opts[:server]) task1 = Task.async(fn -> EtsInspector.load_supported_features(opts) end) task2 = Task.async(fn -> EtsInspector.load_supported_features(opts) end) assert {:ok, %{supports_generated_column_replication: _val} = features} = Task.await(task1) - assert {:ok, ^features} = Task.await(task2) # Non-parallel call should return value from cache assert {:ok, ^features} = EtsInspector.load_supported_features(opts) - assert Repatch.called?(Postgrex, :query, 3, by: opts[:server], exactly: 1) + assert_receive :db_query + refute_receive :db_query end end @@ -613,10 +725,123 @@ defmodule Electric.Postgres.Inspector.EtsInspectorTest do end end + describe "negative cache expiry" do + setup :with_shared_db + setup :in_transaction + setup :with_stack_id_from_test + setup [:with_persistent_kv, :with_basic_tables, :with_sql_execute] + + setup ctx do + start_supervised!({Task.Supervisor, name: EtsInspector.task_supervisor_name(ctx.stack_id)}) + + server = + start_supervised!( + {EtsInspector, + stack_id: ctx.stack_id, + pool: ctx.db_conn, + persistent_kv: ctx.persistent_kv, + negative_cache_ttl_ms: 50} + ) + + %{opts: [stack_id: ctx.stack_id, server: server]} + end + + test "re-attempts the DB after the negative TTL expires", %{opts: opts, db_conn: pool} do + test_pid = self() + + Repatch.patch(Postgrex, :transaction, [mode: :shared], fn p, fun, db_opts -> + if p == pool, do: send(test_pid, :db_transaction) + Repatch.real(Postgrex.transaction(p, fun, db_opts)) + end) + + allow_fill_workers(opts[:server]) + + assert :table_not_found = EtsInspector.load_relation_oid({"public", "ghost"}, opts) + assert_receive :db_transaction + + Process.sleep(80) + + # TTL elapsed: the negative cache entry is stale, so the lookup hits the DB again. + assert :table_not_found = EtsInspector.load_relation_oid({"public", "ghost"}, opts) + assert_receive :db_transaction + end + + test "physically reclaims expired negative-cache entries on sweep", %{opts: opts} do + table = EtsInspector.inspector_table(opts) + neg_key = {:negative, {:rel, {"public", "ghost"}}} + + assert :table_not_found = EtsInspector.load_relation_oid({"public", "ghost"}, opts) + assert [{^neg_key, :table_not_found, _expires_at}] = :ets.lookup(table, neg_key) + + Process.sleep(80) + send(opts[:server], :sweep_negative_cache) + + # A subsequent synchronous call is processed after the sweep message, so the + # expired entry is guaranteed to be gone by the time it returns. + _ = EtsInspector.list_relations_with_stale_cache(opts) + + assert [] = :ets.lookup(table, neg_key) + end + end + + describe "telemetry" do + setup :with_shared_db + setup :in_transaction + setup :with_stack_id_from_test + setup [:with_persistent_kv, :with_inspector, :with_basic_tables, :with_sql_execute] + setup %{inspector: {EtsInspector, opts}}, do: %{opts: opts} + + test "emits a span around each DB lookup tagged with the key type", %{opts: opts} do + test_pid = self() + handler_id = {:inspector_fetch_db_span, opts[:stack_id]} + + :telemetry.attach_many( + handler_id, + [ + [:electric, :inspector, :fetch_db, :start], + [:electric, :inspector, :fetch_db, :stop] + ], + fn event, measurements, metadata, _ -> + send(test_pid, {:fetch_db_span, List.last(event), measurements, metadata}) + end, + nil + ) + + on_exit(fn -> :telemetry.detach(handler_id) end) + + assert {:ok, _} = EtsInspector.load_relation_oid({"public", "items"}, opts) + + # The key type is recorded on the span (start metadata)... + assert_receive {:fetch_db_span, :start, _, %{"inspector.key_type" => "relation"}} + # ...and the span is timed (stop measurements carry the duration). + assert_receive {:fetch_db_span, :stop, %{duration: duration}, _} + assert is_integer(duration) and duration > 0 + end + end + defp with_items_oid(%{db_conn: conn}) do %{rows: [[oid]]} = Postgrex.query!(conn, "SELECT oid FROM pg_class WHERE relname = 'items'", []) %{items_oid: oid} end + + # EtsInspector runs its DB lookups in short-lived `Task.Supervisor.async_nolink` + # workers spawned by the GenServer. A `mode: :shared` Repatch patch does not + # reach a process spawned by an allowed process, so to observe/stub the DB call + # we allow the GenServer and then allow each worker it spawns: patch + # `async_nolink` (which runs in the now-allowed GenServer) to allow the returned + # task's pid. The worker then sees this test's shared patches. + defp allow_fill_workers(server) do + test_pid = self() + Repatch.allow(test_pid, server) + + Repatch.patch(Task.Supervisor, :async_nolink, [mode: :shared], fn supervisor, fun -> + task = Repatch.real(Task.Supervisor.async_nolink(supervisor, fun)) + Repatch.allow(test_pid, task.pid) + task + end) + + :ok + end end diff --git a/packages/sync-service/test/support/component_setup.ex b/packages/sync-service/test/support/component_setup.ex index 284415d51c..444bbd3e03 100644 --- a/packages/sync-service/test/support/component_setup.ex +++ b/packages/sync-service/test/support/component_setup.ex @@ -382,6 +382,15 @@ defmodule Support.ComponentSetup do end def with_inspector(ctx) do + # The inspector's Task.Supervisor is a sibling that outlives inspector + # restarts in production, so only start it if a previous `with_inspector/1` + # call in this test hasn't already (e.g. restart scenarios). + task_supervisor_name = EtsInspector.task_supervisor_name(ctx.stack_id) + + if is_nil(GenServer.whereis(task_supervisor_name)) do + start_supervised!({Task.Supervisor, name: task_supervisor_name}) + end + server = start_supervised!( {EtsInspector, diff --git a/packages/sync-service/test/test_helper.exs b/packages/sync-service/test/test_helper.exs index 9f5d0847ee..c654380e2d 100644 --- a/packages/sync-service/test/test_helper.exs +++ b/packages/sync-service/test/test_helper.exs @@ -55,6 +55,7 @@ if not skip_repatch_prewarm? do Electric.Shapes.Consumer.Snapshotter, Electric.Shapes.DynamicConsumerSupervisor, Electric.Shapes.Shape, + Task.Supervisor, :otel_tracer ] )