diff --git a/.changeset/async-deleter-full-disk-boot.md b/.changeset/async-deleter-full-disk-boot.md new file mode 100644 index 0000000000..9fed6fb842 --- /dev/null +++ b/.changeset/async-deleter-full-disk-boot.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Fix `AsyncDeleter` crashing the stack on boot when the storage volume is full. It now boots resiliently, logs the real error (e.g. `ENOSPC`) instead of a misleading `ENOENT`, and self-heals — creating the trash directory and recapturing pending deletions once disk space is available again. diff --git a/packages/sync-service/lib/electric/async_deleter.ex b/packages/sync-service/lib/electric/async_deleter.ex index 4fda2cff58..14e84088e1 100644 --- a/packages/sync-service/lib/electric/async_deleter.ex +++ b/packages/sync-service/lib/electric/async_deleter.ex @@ -26,7 +26,10 @@ defmodule Electric.AsyncDeleter do timer_ref: nil, cleanup_task: nil, pending: [], - in_progress: [] + in_progress: [], + trash_dir_ready: false, + pending_sources: [], + heal_timer_ref: nil ] @trash_dir_base ".electric_trash" @@ -56,14 +59,22 @@ defmodule Electric.AsyncDeleter do :ok {:error, :enoent} -> - Logger.debug("AsyncDeleter: path already gone #{path}") - :ok + if File.exists?(path) do + # The source still exists, so the :enoent is about the *trash dir* + # being missing — hand the live path to the deleter to capture later. + GenServer.cast(name(stack_id), {:capture_failed, path}) + :ok + else + Logger.debug("AsyncDeleter: path already gone #{path}") + :ok + end {:error, reason} -> - # If this is happening then there's something bad going on and our - # storage is just accruing. - Logger.error("AsyncDeleter: rename failed for #{path}: #{inspect(reason)}") - {:error, reason} + # e.g. ENOSPC: cannot move into the trash dir right now. Hand off so the + # deleter retries the capture once space frees up. + Logger.error("AsyncDeleter: rename failed for #{path}: #{inspect(reason)} - will retry") + GenServer.cast(name(stack_id), {:capture_failed, path}) + :ok end end @@ -76,17 +87,41 @@ defmodule Electric.AsyncDeleter do Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id) trash_dir = trash_dir!(stack_id) - File.mkdir_p(trash_dir) - state = %__MODULE__{ - stack_id: stack_id, - interval_ms: Keyword.get(opts, :cleanup_interval_ms, @default_cleanup_interval_ms), - pending: File.ls!(trash_dir) - } + {ready?, pending} = + case File.mkdir_p(trash_dir) do + :ok -> + {true, list_trash(trash_dir)} + + {:error, reason} -> + Logger.error( + "AsyncDeleter: could not create trash directory #{trash_dir}: #{inspect(reason)} " <> + "- will retry. Deletes will be queued until the directory is writable." + ) + + {false, []} + end + + state = + maybe_arm_heal(%__MODULE__{ + stack_id: stack_id, + interval_ms: Keyword.get(opts, :cleanup_interval_ms, @default_cleanup_interval_ms), + trash_dir_ready: ready?, + pending: pending + }) {:ok, state, {:continue, :initial_cleanup}} end + # List trash contents without raising; an unreadable/absent dir means nothing + # pending to sweep right now. + defp list_trash(trash_dir) do + case File.ls(trash_dir) do + {:ok, entries} -> entries + {:error, _reason} -> [] + end + end + @impl true def handle_continue(:initial_cleanup, state) do {:noreply, do_cleanup(state)} @@ -107,6 +142,16 @@ defmodule Electric.AsyncDeleter do {:noreply, %{state | pending: [path | state.pending]}, {:continue, :schedule_cleanup}} end + def handle_cast({:capture_failed, source}, state) do + state = + state + |> add_pending_source(source) + |> drain_pending_sources() + |> maybe_arm_heal() + + {:noreply, state, {:continue, :schedule_cleanup}} + end + defp unique_destination(trash_dir, base) do suffix = System.unique_integer([:positive]) |> to_string() Path.join(trash_dir, base <> "_" <> suffix) @@ -158,6 +203,26 @@ defmodule Electric.AsyncDeleter do # ignore down messages for normal task termination, already handled in result message def handle_info({:DOWN, _ref, :process, _pid, :normal}, state), do: {:noreply, state} + def handle_info(:ensure_trash_dir, state) do + state = + %{state | heal_timer_ref: nil} + |> drain_pending_sources() + |> maybe_arm_heal() + + case state.pending_sources do + [] -> + :ok + + sources -> + Logger.warning( + "AsyncDeleter: trash directory still un-writable, " <> + "#{length(sources)} deletion(s) queued for stack #{state.stack_id} - will keep retrying" + ) + end + + {:noreply, state, {:continue, :schedule_cleanup}} + end + @impl true def terminate(reason, state) do # We want to avoid AsyncDeleter being brought back up while a cleanup task is still running, @@ -188,6 +253,66 @@ defmodule Electric.AsyncDeleter do end end + # Arm the self-heal timer iff we are degraded (dir not ready or sources waiting). + # Idempotent: never stacks timers; goes silent once healthy. + defp maybe_arm_heal(%{heal_timer_ref: ref} = state) when is_reference(ref), do: state + + defp maybe_arm_heal(%{trash_dir_ready: true, pending_sources: []} = state), do: state + + defp maybe_arm_heal(state) do + %{state | heal_timer_ref: Process.send_after(self(), :ensure_trash_dir, state.interval_ms)} + end + + defp add_pending_source(state, source) do + if source in state.pending_sources do + state + else + %{state | pending_sources: [source | state.pending_sources]} + end + end + + # Ensure the trash dir exists, then try to move each still-live source into it. + # Captured sources become `pending` (to be swept); vanished sources are dropped; + # sources that still cannot be moved stay in `pending_sources`. + defp drain_pending_sources(%{pending_sources: []} = state), do: state + + defp drain_pending_sources(state) do + trash_dir = trash_dir!(state.stack_id) + + case File.mkdir_p(trash_dir) do + :ok -> + {captured, still_pending} = + Enum.reduce(state.pending_sources, {[], []}, fn source, {captured, still_pending} -> + case do_rename(source, trash_dir) do + {:ok, _dest} -> + {[source | captured], still_pending} + + {:error, :enoent} -> + # Source vanished out from under us; nothing to reclaim. + if File.exists?(source) do + {captured, [source | still_pending]} + else + {captured, still_pending} + end + + {:error, _reason} -> + {captured, [source | still_pending]} + end + end) + + %{ + state + | trash_dir_ready: true, + pending_sources: still_pending, + pending: captured ++ state.pending + } + + {:error, _reason} -> + # Still cannot create the trash dir; keep everything queued. + %{state | trash_dir_ready: false} + end + end + defp do_cleanup(%{pending: []} = state), do: state defp do_cleanup(state) do diff --git a/packages/sync-service/test/electric/async_deleter_test.exs b/packages/sync-service/test/electric/async_deleter_test.exs index f9fca6ce94..9b4b366fc6 100644 --- a/packages/sync-service/test/electric/async_deleter_test.exs +++ b/packages/sync-service/test/electric/async_deleter_test.exs @@ -42,10 +42,12 @@ defmodule Electric.AsyncDeleterTest do end setup ctx do - start_link_supervised!( - {AsyncDeleter, - stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval} - ) + unless ctx[:no_default_deleter] do + start_link_supervised!( + {AsyncDeleter, + stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval} + ) + end :ok end @@ -151,6 +153,85 @@ defmodule Electric.AsyncDeleterTest do assert_dir_empty(trash_dir) end + describe "resilient boot" do + @describetag :no_default_deleter + + setup ctx do + # Put a regular file where the `.electric_trash` directory needs to go, + # so File.mkdir_p of the trash dir fails with :enotdir regardless of uid. + trash_base = Path.join(ctx.tmp_dir, ".electric_trash") + File.write!(trash_base, "obstruction") + [trash_base: trash_base] + end + + test "boots without crashing when the trash dir cannot be created", ctx do + pid = + start_link_supervised!( + {AsyncDeleter, + stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval} + ) + + assert Process.alive?(pid) + end + + test "delete hands off a live source when the trash dir is missing", ctx do + start_link_supervised!( + {AsyncDeleter, + stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval} + ) + + dir = Path.join(ctx.tmp_dir, "live_shape") + File.mkdir_p!(dir) + File.write!(Path.join(dir, "f.txt"), "data") + + assert :ok = AsyncDeleter.delete(ctx.stack_id, dir) + assert File.exists?(dir) + end + + test "deleting a missing source still returns ok with no trash dir", ctx do + start_link_supervised!( + {AsyncDeleter, + stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval} + ) + + gone = Path.join(ctx.tmp_dir, "never_existed") + refute File.exists?(gone) + assert :ok = AsyncDeleter.delete(ctx.stack_id, gone) + end + + test "recaptures and reclaims a handed-off source after the obstruction clears", ctx do + start_link_supervised!( + {AsyncDeleter, + stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval} + ) + + dir = Path.join(ctx.tmp_dir, "live_shape") + File.mkdir_p!(dir) + File.write!(Path.join(dir, "f.txt"), "data") + + assert :ok = AsyncDeleter.delete(ctx.stack_id, dir) + assert File.exists?(dir) + + # Clear the obstruction so the trash dir can be created on the next heal tick. + File.rm!(ctx.trash_base) + + # Within a few heal/sweep intervals: dir captured into trash, then reaped. + wait_until(fn -> not File.exists?(dir) end, 5_000) + refute File.exists?(dir) + end + end + + defp wait_until(fun, timeout) when timeout > 0 do + if fun.() do + :ok + else + Process.sleep(@interval) + wait_until(fun, timeout - @interval) + end + end + + defp wait_until(_fun, _timeout), do: :ok + defp assert_dir_empty(dir, timeout \\ 500) do assert File.ls!(dir) == [] rescue