Skip to content

Commit caf756f

Browse files
committed
catch handle_call timeouts when sending to SlotProcessorServer
1 parent a5b416a commit caf756f

1 file changed

Lines changed: 17 additions & 1 deletion

File tree

lib/sequin/runtime/slot_producer/reorder_buffer.ex

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,15 @@ defmodule Sequin.Runtime.SlotProducer.ReorderBuffer do
119119

120120
state = maybe_cancel_flush_batch_timer(state)
121121

122-
case state.flush_batch_fn.(state.id, batch) do
122+
result =
123+
try do
124+
state.flush_batch_fn.(state.id, batch)
125+
catch
126+
:exit, reason ->
127+
{:exit, reason}
128+
end
129+
130+
case result do
123131
:ok ->
124132
state = %{state | ready_batches_by_idx: ready_batches}
125133

@@ -133,6 +141,14 @@ defmodule Sequin.Runtime.SlotProducer.ReorderBuffer do
133141
Logger.warning("[ReorderBuffer] Error pushing batch #{inspect(batch)}: #{inspect(reason)}")
134142
state = schedule_flush_timer_retry(state)
135143
{:noreply, [], state}
144+
145+
{:exit, reason} ->
146+
Logger.warning(
147+
"[ReorderBuffer] Exit while flushing batch #{inspect(batch)}: #{inspect(reason)}"
148+
)
149+
150+
state = schedule_flush_timer_retry(state)
151+
{:noreply, [], state}
136152
end
137153
end
138154

0 commit comments

Comments
 (0)