Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/small-shape-refs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Reduce memory use by using compact runtime shape refs internally.
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmodule Electric do
@type pg_connection_opts :: [unquote(NimbleOptions.option_typespec(opts_schema))]
@type stack_id :: binary()
@type shape_handle() :: binary()
@type shape_id() :: non_neg_integer()

default = fn key -> inspect(Electric.Config.default(key)) end

Expand Down
191 changes: 107 additions & 84 deletions packages/sync-service/lib/electric/replication/shape_log_collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ defmodule Electric.Replication.ShapeLogCollector do
Adds a shape to the shape matching index in the ShapeLogCollector
used for matching and sending replication stream operations.
"""
defdelegate add_shape(stack_id, shape_handle, shape, operation), to: __MODULE__.RequestBatcher
defdelegate add_shape(stack_id, shape_id, shape, operation), to: __MODULE__.RequestBatcher

@doc """
Removes a shape from the shape matching index in the ShapeLogCollector.
This call succeeds before the shape is actually removed from the index.
"""
defdelegate remove_shape(stack_id, shape_handle), to: __MODULE__.RequestBatcher
defdelegate remove_shape(stack_id, shape_id), to: __MODULE__.RequestBatcher

@doc """
Handles batched shape registration updates from the RequestBatcher.
Expand All @@ -158,16 +158,16 @@ defmodule Electric.Replication.ShapeLogCollector do

Should be called by consumer processes after they flush data.
"""
@spec notify_flushed(Electric.stack_id(), Electric.shape_handle(), LogOffset.t()) :: :ok
def notify_flushed(stack_id, shape_handle, offset) do
GenServer.cast(name(stack_id), {:writer_flushed, shape_handle, offset})
@spec notify_flushed(Electric.stack_id(), Electric.shape_id(), LogOffset.t()) :: :ok
def notify_flushed(stack_id, shape_id, offset) do
GenServer.cast(name(stack_id), {:writer_flushed, shape_id, offset})
end

@doc """
Returns the list of currently active shapes being tracked
in the shape matching filters.
"""
@spec active_shapes(Electric.stack_id()) :: MapSet.t(Electric.shape_handle())
@spec active_shapes(Electric.stack_id()) :: MapSet.t(Electric.shape_id())
def active_shapes(stack_id) do
GenServer.call(name(stack_id), :active_shapes)
end
Expand Down Expand Up @@ -241,7 +241,6 @@ defmodule Electric.Replication.ShapeLogCollector do
tracked_relations: tracker_state,
partitions: Partitions.new(Keyword.new(opts)),
dependency_layers: DependencyLayers.new(),
pids_by_shape_handle: %{},
event_router:
opts
|> Keyword.new()
Expand Down Expand Up @@ -271,30 +270,41 @@ defmodule Electric.Replication.ShapeLogCollector do

{partitions, event_router, layers, count} =
state.stack_id
|> Electric.ShapeCache.ShapeStatus.list_shapes()
|> Electric.ShapeCache.ShapeStatus.list_shapes_with_ids()
|> Enum.reduce(
{state.partitions, state.event_router, state.dependency_layers, 0},
fn {shape_handle, shape}, {partitions, event_router, layers, count} ->
fn {shape_handle, shape_id, shape}, {partitions, event_router, layers, count} = acc ->
# Check dependencies first - if a parent shape failed to restore,
# we should skip this shape (and its children will also be skipped)
case DependencyLayers.add_dependency(layers, shape, shape_handle) do
{:ok, layers} ->
partitions = restore_partitions_for_shape(partitions, shape_handle, shape)
with {:ok, dependency_ids} <- dependency_ids(state.stack_id, shape),
{:ok, layers} <-
DependencyLayers.add_dependency(layers, dependency_ids, shape_id) do
partitions = restore_partitions_for_shape(partitions, shape_id, shape)

{
partitions,
EventRouter.add_shape(event_router, shape_id, shape),
layers,
count + 1
}
else
# A dependency has no id mapping in ShapeStatus, e.g. it's
# mid-removal. Skip this shape.
:error ->
Logger.warning(
"Skipping shape during restore: dependency has no id mapping",
shape_handle: shape_handle
)

{
partitions,
EventRouter.add_shape(event_router, shape_handle, shape),
layers,
count + 1
}
acc

{:error, {:missing_dependencies, missing_deps}} ->
Logger.warning(
"Skipping shape during restore: missing dependencies #{inspect(MapSet.to_list(missing_deps))}",
shape_handle: shape_handle
)

{partitions, event_router, layers, count}
acc
end
end
)
Expand Down Expand Up @@ -347,24 +357,40 @@ defmodule Electric.Replication.ShapeLogCollector do
@restore_retry_delay_ms 100
@restore_max_retries 40

defp restore_partitions_for_shape(partitions, shape_handle, shape, attempt \\ 1) do
case Partitions.add_shape(partitions, shape_handle, shape) do
defp restore_partitions_for_shape(partitions, shape_id, shape, attempt \\ 1) do
case Partitions.add_shape(partitions, shape_id, shape) do
{:ok, partitions} ->
partitions

{:error, reason} when attempt >= @restore_max_retries ->
raise "Failed to restore partition info for shape #{shape_handle}: #{inspect(reason)}"
raise "Failed to restore partition info for shape #{shape_id}: #{inspect(reason)}"

{:error, reason} ->
if attempt == 1 do
Logger.warning(
"Retrying shape restore: failed to introspect #{Electric.Utils.inspect_relation(shape.root_table)}: #{inspect(reason)}",
shape_handle: shape_handle
shape_id: shape_id
)
end

Process.sleep(@restore_retry_delay_ms)
restore_partitions_for_shape(partitions, shape_handle, shape, attempt + 1)
restore_partitions_for_shape(partitions, shape_id, shape, attempt + 1)
end
end

# Resolve a shape's dependency handles to their numeric ids, which is the key
# the routing pipeline (EventRouter / ConsumerRegistry / DependencyLayers) is
# keyed by. Returns `:error` if any dependency has no id mapping.
defp dependency_ids(stack_id, shape) do
Enum.reduce_while(shape.shape_dependencies_handles, {:ok, []}, fn handle, {:ok, acc} ->
case Electric.ShapeCache.ShapeStatus.id_for_handle(stack_id, handle) do
{:ok, id} -> {:cont, {:ok, [id | acc]}}
:error -> {:halt, :error}
end
end)
|> case do
{:ok, ids} -> {:ok, Enum.reverse(ids)}
:error -> :error
end
end

Expand Down Expand Up @@ -433,48 +459,50 @@ defmodule Electric.Replication.ShapeLogCollector do
fn ->
{state, results} =
shapes_to_remove
|> Enum.reduce({state, %{}}, fn shape_handle, {state, results} ->
case remove_subscription(state, shape_handle) do
{:ok, state} -> {state, Map.put(results, shape_handle, :ok)}
{:error, reason} -> {state, Map.put(results, shape_handle, {:error, reason})}
|> Enum.reduce({state, %{}}, fn shape_id, {state, results} ->
case remove_subscription(state, shape_id) do
{:ok, state} -> {state, Map.put(results, shape_id, :ok)}
{:error, reason} -> {state, Map.put(results, shape_id, {:error, reason})}
end
end)

{state, results} =
shapes_to_add
|> Enum.reduce({state, results}, fn {shape_handle, shape}, {state, results} ->
case Partitions.add_shape(state.partitions, shape_handle, shape) do
{:ok, partitions} ->
case DependencyLayers.add_dependency(
state.dependency_layers,
shape,
shape_handle
) do
{:ok, dependency_layers} ->
state =
%{
state
| partitions: partitions,
event_router:
EventRouter.add_shape(state.event_router, shape_handle, shape),
dependency_layers: dependency_layers
}
|> Map.update!(:subscriptions, &(&1 + 1))
|> log_subscription_status()

{state, Map.put(results, shape_handle, :ok)}

{:error, {:missing_dependencies, missing_deps}} ->
Logger.warning(
"Shape cannot be added: missing dependencies #{inspect(MapSet.to_list(missing_deps))}",
shape_handle: shape_handle
)

{state, Map.put(results, shape_handle, {:error, :missing_dependencies})}
end
|> Enum.reduce({state, results}, fn {shape_id, shape}, {state, results} ->
with {:ok, dep_ids} <- dependency_ids(state.stack_id, shape),
{:ok, partitions} <- Partitions.add_shape(state.partitions, shape_id, shape) do
case DependencyLayers.add_dependency(
state.dependency_layers,
dep_ids,
shape_id
) do
{:ok, dependency_layers} ->
state =
%{
state
| partitions: partitions,
event_router: EventRouter.add_shape(state.event_router, shape_id, shape),
dependency_layers: dependency_layers
}
|> Map.update!(:subscriptions, &(&1 + 1))
|> log_subscription_status()

{state, Map.put(results, shape_id, :ok)}

{:error, {:missing_dependencies, missing_deps}} ->
Logger.warning(
"Shape cannot be added: missing dependencies #{inspect(MapSet.to_list(missing_deps))}",
shape_id: shape_id
)

{state, Map.put(results, shape_id, {:error, :missing_dependencies})}
end
else
:error ->
{state, Map.put(results, shape_id, {:error, :missing_dependencies})}

{:error, reason} ->
{state, Map.put(results, shape_handle, {:error, reason})}
{state, Map.put(results, shape_id, {:error, reason})}
end
end)

Expand Down Expand Up @@ -627,12 +655,12 @@ defmodule Electric.Replication.ShapeLogCollector do
defp publish(state, event) do
OpenTelemetry.start_interval(:"shape_log_collector.event_routing.duration_µs")

{events_by_handle, event_router} =
EventRouter.event_by_shape_handle(state.event_router, event)
{events_by_id, event_router} =
EventRouter.event_by_shape_id(state.event_router, event)

state = %{state | event_router: event_router}

affected_shapes = Map.keys(events_by_handle) |> MapSet.new()
affected_shapes = Map.keys(events_by_id) |> MapSet.new()
affected_shape_count = MapSet.size(affected_shapes)

OpenTelemetry.add_span_attributes(
Expand All @@ -649,13 +677,13 @@ defmodule Electric.Replication.ShapeLogCollector do
context = OpenTelemetry.get_current_context()

undeliverable_set =
for layer <- DependencyLayers.get_for_handles(state.dependency_layers, affected_shapes),
for layer <- DependencyLayers.get_for_shape_ids(state.dependency_layers, affected_shapes),
reduce: MapSet.new() do
acc ->
# Each publish is synchronous, so layers will be processed in order
layer_events =
Map.new(layer, fn handle ->
{handle, {:handle_event, Map.fetch!(events_by_handle, handle), context}}
Map.new(layer, fn id ->
{id, {:handle_event, Map.fetch!(events_by_id, id), context}}
end)

layer_undeliverable = ConsumerRegistry.publish(layer_events, state.registry_state)
Expand All @@ -672,8 +700,8 @@ defmodule Electric.Replication.ShapeLogCollector do
# fragments but are now undeliverable. This prevents stuck flush when
# a consumer processes fragment 1 but crashes on fragment 2.
flush_tracker =
Enum.reduce(undeliverable_set, state.flush_tracker, fn shape_handle, tracker ->
FlushTracker.handle_shape_removed(tracker, shape_handle)
Enum.reduce(undeliverable_set, state.flush_tracker, fn shape_id, tracker ->
FlushTracker.handle_shape_removed(tracker, shape_id)
end)

flush_tracker =
Expand Down Expand Up @@ -763,40 +791,36 @@ defmodule Electric.Replication.ShapeLogCollector do
end
end

defp remove_subscription(%{subscriptions: count} = state, shape_handle) do
defp remove_subscription(%{subscriptions: count} = state, shape_id) do
OpenTelemetry.with_span(
"shape_log_collector.remove_shape",
[shape_handle: shape_handle],
[shape_id: shape_id],
state.stack_id,
fn ->
if EventRouter.has_shape?(state.event_router, shape_handle) do
Logger.debug("Deleting shape #{shape_handle}")
if EventRouter.has_shape?(state.event_router, shape_id) do
Logger.debug(fn ->
"Deleting shape #{Electric.ShapeCache.ShapeStatus.shape_handle_for_log(state.stack_id, shape_id)}"
end)

OpenTelemetry.start_interval(:"unsubscribe_shape.remove_subscription.duration_µs")

OpenTelemetry.start_interval(:"unsubscribe_shape.remove_from_event_router.duration_µs")
event_router = EventRouter.remove_shape(state.event_router, shape_handle)
event_router = EventRouter.remove_shape(state.event_router, shape_id)

OpenTelemetry.start_interval(:"unsubscribe_shape.remove_from_partitions.duration_µs")
partitions = Partitions.remove_shape(state.partitions, shape_handle)

OpenTelemetry.start_interval(
:"unsubscribe_shape.remove_pids_by_shape_handle.duration_µs"
)

pids_by_shape_handle = Map.delete(state.pids_by_shape_handle, shape_handle)
partitions = Partitions.remove_shape(state.partitions, shape_id)

OpenTelemetry.start_interval(:"unsubscribe_shape.remove_from_flush_tracker.duration_µs")
flush_tracker = FlushTracker.handle_shape_removed(state.flush_tracker, shape_handle)
flush_tracker = FlushTracker.handle_shape_removed(state.flush_tracker, shape_id)

OpenTelemetry.start_interval(
:"unsubscribe_shape.remove_from_dependency_layers.duration_µs"
)

dependency_layers =
DependencyLayers.remove_dependency(state.dependency_layers, shape_handle)
DependencyLayers.remove_dependency(state.dependency_layers, shape_id)

Electric.Shapes.ConsumerRegistry.remove_consumer(shape_handle, state.registry_state)
Electric.Shapes.ConsumerRegistry.remove_consumer(shape_id, state.registry_state)

OpenTelemetry.stop_and_save_intervals(
total_attribute: "unsubscribe_shape.total_duration_µs"
Expand All @@ -808,7 +832,6 @@ defmodule Electric.Replication.ShapeLogCollector do
| subscriptions: count - 1,
event_router: event_router,
partitions: partitions,
pids_by_shape_handle: pids_by_shape_handle,
dependency_layers: dependency_layers,
flush_tracker: flush_tracker
}
Expand All @@ -817,7 +840,7 @@ defmodule Electric.Replication.ShapeLogCollector do
# This may happen as we attempt to remove a shape multiple times
# depending on the source of the delete, on the understanding that
# removal is idempotent.
{:error, "shape #{shape_handle} not registered"}
{:error, "shape #{shape_id} not registered"}
end
end
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Electric.Replication.ShapeLogCollector.FlushTracker do
alias Electric.Replication.LogOffset
alias Electric.Replication.Changes.{Commit, TransactionFragment}

@type shape_id() :: term()
@type shape_id() :: Electric.shape_id()

defstruct [
:last_global_flushed_offset,
Expand Down
Loading
Loading