@@ -3,7 +3,7 @@ defmodule Explorer.Chain.PendingOperationsHelper do
33
44 import Ecto.Query
55
6- alias Explorer.Chain . { Hash , PendingBlockOperation , PendingTransactionOperation , Transaction }
6+ alias Explorer.Chain . { Block , Hash , PendingBlockOperation , PendingTransactionOperation , Transaction }
77 alias Explorer . { Helper , Repo }
88
99 defp transactions_batch_size ,
@@ -81,7 +81,7 @@ defmodule Explorer.Chain.PendingOperationsHelper do
8181 from (
8282 pto in PendingTransactionOperation ,
8383 join: t in assoc ( pto , :transaction ) ,
84- select: % { block_hash: t . block_hash , block_number: t . block_number } ,
84+ select: % { block_hash: t . block_hash , block_number: t . block_number , priority: pto . priority } ,
8585 limit: ^ batch_size
8686 )
8787
@@ -246,4 +246,82 @@ defmodule Explorer.Chain.PendingOperationsHelper do
246246 |> block_range_in_query ( max_block_number )
247247 |> Repo . exists? ( )
248248 end
249+
250+ @ doc """
251+ Inserts pending operations for the given block numbers.
252+ """
253+ @ spec insert_pending_operations ( [ integer ( ) ] , integer ( ) | nil ) :: { [ integer ( ) ] , [ Explorer.Chain.Transaction . t ( ) ] }
254+ def insert_pending_operations ( block_numbers , priority \\ nil ) do
255+ case pending_operations_type ( ) do
256+ "transactions" ->
257+ default_on_conflict = default_pto_on_conflict ( )
258+ transactions = Transaction . get_transactions_of_block_numbers ( block_numbers )
259+
260+ pto_params =
261+ transactions
262+ |> Transaction . filter_non_traceable_transactions ( )
263+ |> Enum . map ( & % { transaction_hash: & 1 . hash , priority: priority } )
264+ |> Helper . add_timestamps ( )
265+
266+ Repo . insert_all ( PendingTransactionOperation , pto_params ,
267+ on_conflict: default_on_conflict ,
268+ conflict_target: [ :transaction_hash ]
269+ )
270+
271+ { [ ] , transactions }
272+
273+ "blocks" ->
274+ default_on_conflict = default_pbo_on_conflict ( )
275+
276+ pbo_params =
277+ Block
278+ |> where ( [ b ] , b . number in ^ block_numbers )
279+ |> where ( [ b ] , b . consensus == true )
280+ |> select ( [ b ] , % { block_hash: b . hash , block_number: b . number } )
281+ |> Repo . all ( )
282+ |> add_priority ( priority )
283+ |> Helper . add_timestamps ( )
284+
285+ { _total , inserted } =
286+ Repo . insert_all ( PendingBlockOperation , pbo_params ,
287+ on_conflict: default_on_conflict ,
288+ conflict_target: [ :block_hash ] ,
289+ returning: [ :block_number ]
290+ )
291+
292+ { Enum . map ( inserted , & & 1 . block_number ) , [ ] }
293+ end
294+ end
295+
296+ defp default_pbo_on_conflict do
297+ from (
298+ pending_block_operation in PendingBlockOperation ,
299+ update: [
300+ set: [
301+ priority: fragment ( "EXCLUDED.priority" ) ,
302+ inserted_at: fragment ( "LEAST(?, EXCLUDED.inserted_at)" , pending_block_operation . inserted_at ) ,
303+ updated_at: fragment ( "GREATEST(?, EXCLUDED.updated_at)" , pending_block_operation . updated_at )
304+ ]
305+ ] ,
306+ where: is_nil ( pending_block_operation . priority ) and fragment ( "EXCLUDED.priority IS NOT NULL" )
307+ )
308+ end
309+
310+ defp default_pto_on_conflict do
311+ from (
312+ pending_transaction_operation in PendingTransactionOperation ,
313+ update: [
314+ set: [
315+ priority: fragment ( "EXCLUDED.priority" ) ,
316+ inserted_at: fragment ( "LEAST(?, EXCLUDED.inserted_at)" , pending_transaction_operation . inserted_at ) ,
317+ updated_at: fragment ( "GREATEST(?, EXCLUDED.updated_at)" , pending_transaction_operation . updated_at )
318+ ]
319+ ] ,
320+ where: is_nil ( pending_transaction_operation . priority ) and fragment ( "EXCLUDED.priority IS NOT NULL" )
321+ )
322+ end
323+
324+ defp add_priority ( params , priority ) do
325+ Enum . map ( params , & Map . merge ( & 1 , % { priority: priority } ) )
326+ end
249327end
0 commit comments