@@ -17,6 +17,8 @@ defmodule Mix.Tasks.Bench do
1717 * `--partition-count` - Number of partitions (default: schedulers_online)
1818 * `--max-messages` - Maximum messages to generate (default: unlimited, mutually exclusive with --duration)
1919 * `--through` - Pipeline stage to run through: "full", "reorder_buffer", or "sps" (default: "full")
20+ * `--tprof` - Enable tprof call_time profiling of Sequin.* modules (default: false)
21+ * `--tprof-limit` - Number of top functions to show in tprof report (default: 60)
2022
2123 ## Examples
2224
@@ -37,6 +39,9 @@ defmodule Mix.Tasks.Bench do
3739
3840 # Run through SlotProcessorServer (includes message handler, stops before Broadway)
3941 mix benchmark --through sps
42+
43+ # Run with tprof profiling to see which functions dominate wall time
44+ mix benchmark --duration 30 --tprof
4045 """
4146 use Mix.Task
4247
@@ -80,7 +85,9 @@ defmodule Mix.Tasks.Bench do
8085 pk_collision_rate: :float ,
8186 partition_count: :integer ,
8287 max_messages: :integer ,
83- through: :string
88+ through: :string ,
89+ tprof: :boolean ,
90+ tprof_limit: :integer
8491 ]
8592 )
8693
@@ -91,6 +98,8 @@ defmodule Mix.Tasks.Bench do
9198 partition_count = Keyword . get ( opts , :partition_count , @ default_partition_count )
9299 max_messages = Keyword . get ( opts , :max_messages )
93100 through = opts |> Keyword . get ( :through , "full" ) |> String . to_existing_atom ( )
101+ tprof? = Keyword . get ( opts , :tprof , false )
102+ tprof_limit = Keyword . get ( opts , :tprof_limit , 60 )
94103
95104 if max_messages && duration_opt do
96105 Mix . raise ( "--duration and --max-messages are mutually exclusive" )
@@ -106,6 +115,11 @@ defmodule Mix.Tasks.Bench do
106115 # Start the application
107116 Mix.Task . run ( "app.start" )
108117
118+ # Start tprof if requested
119+ if tprof? do
120+ start_tprof ( )
121+ end
122+
109123 announce ( "#{ @ bold } === Sequin Pipeline Benchmark ===#{ @ reset } " , @ cyan )
110124 IO . puts ( "" )
111125
@@ -117,6 +131,7 @@ defmodule Mix.Tasks.Bench do
117131 IO . puts ( " Partition count: #{ partition_count } " )
118132 IO . puts ( " Max messages: #{ max_messages || "unlimited" } " )
119133 IO . puts ( " Through: #{ through } " )
134+ IO . puts ( " tprof: #{ tprof? } " )
120135 IO . puts ( "" )
121136
122137 # Setup replication slot
@@ -321,10 +336,76 @@ defmodule Mix.Tasks.Bench do
321336 pipeline_tracked
322337 )
323338
339+ # Print tprof report if enabled
340+ if tprof? do
341+ stop_and_report_tprof ( tprof_limit )
342+ end
343+
324344 # Cleanup
325345 cleanup_entities ( consumer , replication )
326346 end
327347
348+ defp start_tprof do
349+ :tprof . start ( % { type: :call_time } )
350+ :tprof . enable_trace ( :all )
351+
352+ for { mod , _ } <- :code . all_loaded ( ) ,
353+ mod_str = Atom . to_string ( mod ) ,
354+ String . starts_with? ( mod_str , "Elixir.Sequin." ) do
355+ :tprof . set_pattern ( mod , :_ , :_ )
356+ end
357+
358+ announce ( "tprof profiling enabled (tracing Sequin.* modules)" , @ yellow )
359+ end
360+
361+ defp stop_and_report_tprof ( limit ) do
362+ :tprof . disable_trace ( :all )
363+ { :call_time , raw } = :tprof . collect ( )
364+ :tprof . stop ( )
365+
366+ grand_total =
367+ raw
368+ |> Enum . map ( fn { _ , _ , _ , pid_data } ->
369+ pid_data |> Enum . map ( fn { _ , _ , time } -> time end ) |> Enum . sum ( )
370+ end )
371+ |> Enum . sum ( )
372+
373+ rows =
374+ raw
375+ |> Enum . map ( fn { mod , fun , arity , pid_data } ->
376+ total_calls = pid_data |> Enum . map ( fn { _ , calls , _ } -> calls end ) |> Enum . sum ( )
377+ total_time = pid_data |> Enum . map ( fn { _ , _ , time } -> time end ) |> Enum . sum ( )
378+ { mod , fun , arity , total_calls , total_time }
379+ end )
380+ |> Enum . sort_by ( fn { _ , _ , _ , _ , time } -> time end , :desc )
381+ |> Enum . take ( limit )
382+
383+ IO . puts ( "" )
384+ announce ( "#{ @ bold } tprof Call Time Profile (top #{ limit } functions):#{ @ reset } " , @ cyan )
385+ IO . puts ( "" )
386+
387+ header =
388+ " #{ String . pad_trailing ( "FUNCTION" , 70 ) } #{ String . pad_leading ( "CALLS" , 12 ) } #{ String . pad_leading ( "TIME (ms)" , 12 ) } #{ String . pad_leading ( "% TOTAL" , 9 ) } "
389+
390+ IO . puts ( header )
391+ IO . puts ( " #{ String . duplicate ( "-" , 105 ) } " )
392+
393+ Enum . each ( rows , fn { mod , fun , arity , calls , time_us } ->
394+ mfa = "#{ inspect ( mod ) } .#{ fun } /#{ arity } "
395+ time_ms = Float . round ( time_us / 1000 , 1 )
396+ pct = if grand_total > 0 , do: Float . round ( time_us / grand_total * 100 , 1 ) , else: 0.0
397+
398+ IO . puts (
399+ " #{ String . pad_trailing ( mfa , 70 ) } #{ String . pad_leading ( format_number ( calls ) , 12 ) } #{ String . pad_leading ( :erlang . float_to_binary ( time_ms , decimals: 1 ) , 12 ) } #{ String . pad_leading ( :erlang . float_to_binary ( pct , decimals: 1 ) <> "%" , 9 ) } "
400+ )
401+ end )
402+
403+ IO . puts ( "" )
404+ total_ms = Float . round ( grand_total / 1000 , 1 )
405+ IO . puts ( " Total traced time: #{ total_ms } ms" )
406+ IO . puts ( "" )
407+ end
408+
328409 defp cleanup_entities ( _consumer , replication ) do
329410 { :ok , database } = Databases . get_db ( replication . postgres_database_id )
330411 { :ok , account } = Accounts . get_account ( database . account_id )
0 commit comments