@@ -10,13 +10,16 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
1010
1111 alias Realtime.Database
1212 alias Realtime.Helpers
13+ alias Realtime.GenRpc
14+ alias Realtime.Telemetry
1315
1416 alias Rls.Subscriptions
1517
1618 @ timeout 15_000
1719 @ max_delete_records 1000
1820 @ check_oids_interval 60_000
1921 @ check_no_users_interval 60_000
22+ @ check_active_pids_interval 120_000
2023 @ stop_after 60_000 * 10
2124
2225 defmodule State do
@@ -32,6 +35,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
3235 no_users_ts: nil ,
3336 oids: % { } ,
3437 check_oid_ref: nil ,
38+ check_active_pids_ref: nil ,
3539 check_region_interval: nil
3640 ]
3741
@@ -43,6 +47,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
4347 conn: Postgrex . conn ( ) ,
4448 oids: map ( ) ,
4549 check_oid_ref: reference ( ) | nil ,
50+ check_active_pids_ref: reference ( ) | nil ,
4651 delete_queue: % {
4752 ref: reference ( ) ,
4853 queue: :queue . queue ( )
@@ -107,6 +112,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
107112 queue: :queue . new ( )
108113 } ,
109114 no_users_ref: check_no_users ( ) ,
115+ check_active_pids_ref: check_active_pids ( ) ,
110116 check_region_interval: check_region_interval
111117 }
112118
@@ -251,6 +257,34 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
251257 end
252258 end
253259
260+ def handle_info (
261+ :check_active_pids ,
262+ % State { check_active_pids_ref: ref , delete_queue: delete_queue , id: id } = state
263+ ) do
264+ Helpers . cancel_timer ( ref )
265+
266+ ids =
267+ state . subscribers_pids_table
268+ |> subscribers_by_node ( )
269+ |> not_alive_pids_dist ( )
270+ |> pop_not_alive_pids ( state . subscribers_pids_table , state . subscribers_nodes_table , id )
271+
272+ new_delete_queue =
273+ if length ( ids ) > 0 do
274+ q =
275+ Enum . reduce ( ids , delete_queue . queue , fn id , acc ->
276+ if :queue . member ( id , acc ) , do: acc , else: :queue . in ( id , acc )
277+ end )
278+
279+ Helpers . cancel_timer ( delete_queue . ref )
280+ % { ref: check_delete_queue ( 1_000 ) , queue: q }
281+ else
282+ delete_queue
283+ end
284+
285+ { :noreply , % { state | check_active_pids_ref: check_active_pids ( ) , delete_queue: new_delete_queue } }
286+ end
287+
254288 def handle_info ( msg , state ) do
255289 log_error ( "UnhandledProcessMessage" , msg )
256290
@@ -259,8 +293,74 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
259293
260294 ## Internal functions
261295
296+ @ spec pop_not_alive_pids ( [ pid ( ) ] , :ets . tid ( ) , :ets . tid ( ) , binary ( ) ) :: [ Ecto.UUID . t ( ) ]
297+ def pop_not_alive_pids ( pids , subscribers_pids_table , subscribers_nodes_table , tenant_id ) do
298+ Enum . reduce ( pids , [ ] , fn pid , acc ->
299+ case :ets . lookup ( subscribers_pids_table , pid ) do
300+ [ ] ->
301+ Telemetry . execute (
302+ [ :realtime , :subscription_manager , :pid_not_found ] ,
303+ % { quantity: 1 } ,
304+ % { tenant_id: tenant_id }
305+ )
306+
307+ acc
308+
309+ results ->
310+ for { ^ pid , postgres_id , _ref , _node } <- results do
311+ Telemetry . execute (
312+ [ :realtime , :subscription_manager , :phantom_pid_detected ] ,
313+ % { quantity: 1 } ,
314+ % { tenant_id: tenant_id }
315+ )
316+
317+ :ets . delete ( subscribers_pids_table , pid )
318+ bin_id = UUID . string_to_binary! ( postgres_id )
319+
320+ :ets . delete ( subscribers_nodes_table , bin_id )
321+ bin_id
322+ end ++ acc
323+ end
324+ end )
325+ end
326+
327+ @ spec subscribers_by_node ( :ets . tid ( ) ) :: % { node ( ) => MapSet . t ( pid ( ) ) }
328+ def subscribers_by_node ( tid ) do
329+ fn { pid , _postgres_id , _ref , node } , acc ->
330+ set = if Map . has_key? ( acc , node ) , do: MapSet . put ( acc [ node ] , pid ) , else: MapSet . new ( [ pid ] )
331+
332+ Map . put ( acc , node , set )
333+ end
334+ |> :ets . foldl ( % { } , tid )
335+ end
336+
337+ @ spec not_alive_pids_dist ( % { node ( ) => MapSet . t ( pid ( ) ) } ) :: [ pid ( ) ] | [ ]
338+ def not_alive_pids_dist ( pids ) do
339+ Enum . reduce ( pids , [ ] , fn { node , pids } , acc ->
340+ if node == node ( ) do
341+ acc ++ not_alive_pids ( pids )
342+ else
343+ case GenRpc . call ( node , __MODULE__ , :not_alive_pids , [ pids ] , timeout: 15_000 ) do
344+ { :error , :rpc_error , _ } = error ->
345+ log_error ( "UnableToCheckProcessesOnRemoteNode" , error )
346+ acc
347+
348+ pids ->
349+ acc ++ pids
350+ end
351+ end
352+ end )
353+ end
354+
355+ @ spec not_alive_pids ( MapSet . t ( pid ( ) ) ) :: [ pid ( ) ] | [ ]
356+ def not_alive_pids ( pids ) do
357+ Enum . reduce ( pids , [ ] , fn pid , acc -> if Process . alive? ( pid ) , do: acc , else: [ pid | acc ] end )
358+ end
359+
262360 defp check_oids , do: Process . send_after ( self ( ) , :check_oids , @ check_oids_interval )
263361
362+ defp check_active_pids , do: Process . send_after ( self ( ) , :check_active_pids , @ check_active_pids_interval )
363+
264364 defp now , do: System . system_time ( :millisecond )
265365
266366 defp check_no_users , do: Process . send_after ( self ( ) , :check_no_users , @ check_no_users_interval )
0 commit comments