Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/async-deleter-full-disk-boot.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Fix `AsyncDeleter` crashing the stack on boot when the storage volume is full. It now boots resiliently, logs the real error (e.g. `ENOSPC`) instead of a misleading `ENOENT`, and self-heals — creating the trash directory and recapturing pending deletions once disk space is available again.
151 changes: 138 additions & 13 deletions packages/sync-service/lib/electric/async_deleter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ defmodule Electric.AsyncDeleter do
timer_ref: nil,
cleanup_task: nil,
pending: [],
in_progress: []
in_progress: [],
trash_dir_ready: false,
pending_sources: [],
heal_timer_ref: nil
]

@trash_dir_base ".electric_trash"
Expand Down Expand Up @@ -56,14 +59,22 @@ defmodule Electric.AsyncDeleter do
:ok

{:error, :enoent} ->
Logger.debug("AsyncDeleter: path already gone #{path}")
:ok
if File.exists?(path) do
# The source still exists, so the :enoent is about the *trash dir*
# being missing — hand the live path to the deleter to capture later.
GenServer.cast(name(stack_id), {:capture_failed, path})
:ok
else
Logger.debug("AsyncDeleter: path already gone #{path}")
:ok
end

{:error, reason} ->
# If this is happening then there's something bad going on and our
# storage is just accruing.
Logger.error("AsyncDeleter: rename failed for #{path}: #{inspect(reason)}")
{:error, reason}
# e.g. ENOSPC: cannot move into the trash dir right now. Hand off so the
# deleter retries the capture once space frees up.
Logger.error("AsyncDeleter: rename failed for #{path}: #{inspect(reason)} - will retry")
GenServer.cast(name(stack_id), {:capture_failed, path})
:ok
end
end

Expand All @@ -76,17 +87,41 @@ defmodule Electric.AsyncDeleter do
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)

trash_dir = trash_dir!(stack_id)
File.mkdir_p(trash_dir)

state = %__MODULE__{
stack_id: stack_id,
interval_ms: Keyword.get(opts, :cleanup_interval_ms, @default_cleanup_interval_ms),
pending: File.ls!(trash_dir)
}
{ready?, pending} =
case File.mkdir_p(trash_dir) do
:ok ->
{true, list_trash(trash_dir)}

{:error, reason} ->
Logger.error(
"AsyncDeleter: could not create trash directory #{trash_dir}: #{inspect(reason)} " <>
"- will retry. Deletes will be queued until the directory is writable."
)

{false, []}
end

state =
maybe_arm_heal(%__MODULE__{
stack_id: stack_id,
interval_ms: Keyword.get(opts, :cleanup_interval_ms, @default_cleanup_interval_ms),
trash_dir_ready: ready?,
pending: pending
})

{:ok, state, {:continue, :initial_cleanup}}
end

# List trash contents without raising; an unreadable/absent dir means nothing
# pending to sweep right now.
defp list_trash(trash_dir) do
case File.ls(trash_dir) do
{:ok, entries} -> entries
{:error, _reason} -> []
end
end

@impl true
def handle_continue(:initial_cleanup, state) do
{:noreply, do_cleanup(state)}
Expand All @@ -107,6 +142,16 @@ defmodule Electric.AsyncDeleter do
{:noreply, %{state | pending: [path | state.pending]}, {:continue, :schedule_cleanup}}
end

def handle_cast({:capture_failed, source}, state) do
state =
state
|> add_pending_source(source)
|> drain_pending_sources()
|> maybe_arm_heal()

{:noreply, state, {:continue, :schedule_cleanup}}
end

defp unique_destination(trash_dir, base) do
suffix = System.unique_integer([:positive]) |> to_string()
Path.join(trash_dir, base <> "_" <> suffix)
Expand Down Expand Up @@ -158,6 +203,26 @@ defmodule Electric.AsyncDeleter do
# ignore down messages for normal task termination, already handled in result message
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state), do: {:noreply, state}

def handle_info(:ensure_trash_dir, state) do
state =
%{state | heal_timer_ref: nil}
|> drain_pending_sources()
|> maybe_arm_heal()

case state.pending_sources do
[] ->
:ok

sources ->
Logger.warning(
"AsyncDeleter: trash directory still un-writable, " <>
"#{length(sources)} deletion(s) queued for stack #{state.stack_id} - will keep retrying"
)
end

{:noreply, state, {:continue, :schedule_cleanup}}
end

@impl true
def terminate(reason, state) do
# We want to avoid AsyncDeleter being brought back up while a cleanup task is still running,
Expand Down Expand Up @@ -188,6 +253,66 @@ defmodule Electric.AsyncDeleter do
end
end

# Arm the self-heal timer iff we are degraded (dir not ready or sources waiting).
# Idempotent: never stacks timers; goes silent once healthy.
defp maybe_arm_heal(%{heal_timer_ref: ref} = state) when is_reference(ref), do: state

defp maybe_arm_heal(%{trash_dir_ready: true, pending_sources: []} = state), do: state

defp maybe_arm_heal(state) do
%{state | heal_timer_ref: Process.send_after(self(), :ensure_trash_dir, state.interval_ms)}
end

defp add_pending_source(state, source) do
if source in state.pending_sources do
state
else
%{state | pending_sources: [source | state.pending_sources]}
end
end

# Ensure the trash dir exists, then try to move each still-live source into it.
# Captured sources become `pending` (to be swept); vanished sources are dropped;
# sources that still cannot be moved stay in `pending_sources`.
defp drain_pending_sources(%{pending_sources: []} = state), do: state

defp drain_pending_sources(state) do
trash_dir = trash_dir!(state.stack_id)

case File.mkdir_p(trash_dir) do
:ok ->
{captured, still_pending} =
Enum.reduce(state.pending_sources, {[], []}, fn source, {captured, still_pending} ->
case do_rename(source, trash_dir) do
{:ok, _dest} ->
{[source | captured], still_pending}

{:error, :enoent} ->
# Source vanished out from under us; nothing to reclaim.
if File.exists?(source) do
{captured, [source | still_pending]}
else
{captured, still_pending}
end

{:error, _reason} ->
{captured, [source | still_pending]}
end
end)

%{
state
| trash_dir_ready: true,
pending_sources: still_pending,
pending: captured ++ state.pending
}

{:error, _reason} ->
# Still cannot create the trash dir; keep everything queued.
%{state | trash_dir_ready: false}
end
end

defp do_cleanup(%{pending: []} = state), do: state

defp do_cleanup(state) do
Expand Down
89 changes: 85 additions & 4 deletions packages/sync-service/test/electric/async_deleter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ defmodule Electric.AsyncDeleterTest do
end

setup ctx do
start_link_supervised!(
{AsyncDeleter,
stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval}
)
unless ctx[:no_default_deleter] do
start_link_supervised!(
{AsyncDeleter,
stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval}
)
end

:ok
end
Expand Down Expand Up @@ -151,6 +153,85 @@ defmodule Electric.AsyncDeleterTest do
assert_dir_empty(trash_dir)
end

describe "resilient boot" do
@describetag :no_default_deleter

setup ctx do
# Put a regular file where the `.electric_trash` directory needs to go,
# so File.mkdir_p of the trash dir fails with :enotdir regardless of uid.
trash_base = Path.join(ctx.tmp_dir, ".electric_trash")
File.write!(trash_base, "obstruction")
[trash_base: trash_base]
end

test "boots without crashing when the trash dir cannot be created", ctx do
pid =
start_link_supervised!(
{AsyncDeleter,
stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval}
)

assert Process.alive?(pid)
end

test "delete hands off a live source when the trash dir is missing", ctx do
start_link_supervised!(
{AsyncDeleter,
stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval}
)

dir = Path.join(ctx.tmp_dir, "live_shape")
File.mkdir_p!(dir)
File.write!(Path.join(dir, "f.txt"), "data")

assert :ok = AsyncDeleter.delete(ctx.stack_id, dir)
assert File.exists?(dir)
end

test "deleting a missing source still returns ok with no trash dir", ctx do
start_link_supervised!(
{AsyncDeleter,
stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval}
)

gone = Path.join(ctx.tmp_dir, "never_existed")
refute File.exists?(gone)
assert :ok = AsyncDeleter.delete(ctx.stack_id, gone)
end

test "recaptures and reclaims a handed-off source after the obstruction clears", ctx do
start_link_supervised!(
{AsyncDeleter,
stack_id: ctx.stack_id, storage_dir: ctx.tmp_dir, cleanup_interval_ms: @interval}
)

dir = Path.join(ctx.tmp_dir, "live_shape")
File.mkdir_p!(dir)
File.write!(Path.join(dir, "f.txt"), "data")

assert :ok = AsyncDeleter.delete(ctx.stack_id, dir)
assert File.exists?(dir)

# Clear the obstruction so the trash dir can be created on the next heal tick.
File.rm!(ctx.trash_base)

# Within a few heal/sweep intervals: dir captured into trash, then reaped.
wait_until(fn -> not File.exists?(dir) end, 5_000)
refute File.exists?(dir)
end
end

defp wait_until(fun, timeout) when timeout > 0 do
if fun.() do
:ok
else
Process.sleep(@interval)
wait_until(fun, timeout - @interval)
end
end

defp wait_until(_fun, _timeout), do: :ok

defp assert_dir_empty(dir, timeout \\ 500) do
assert File.ls!(dir) == []
rescue
Expand Down
Loading