@@ -59,6 +59,45 @@ defmodule Demo.Pipeline do
5959 end
6060end
6161
62+ defmodule Demo.SecondPipeline do
63+ use Broadway
64+
65+ def start_link ( opts ) do
66+ Broadway . start_link ( __MODULE__ ,
67+ name: __MODULE__ ,
68+ producer: [
69+ module: { Broadway.DummyProducer , opts } ,
70+ concurrency: 1
71+ ] ,
72+ processors: [
73+ default: [ concurrency: 2 ]
74+ ] ,
75+ batchers: [
76+ default: [ batch_size: 6 , concurrency: 2 , batch_timeout: 1000 ] ,
77+ ]
78+ )
79+ end
80+
81+ @ impl true
82+ def handle_message ( _ , % Broadway.Message { } = message , _ ) do
83+ Broadway.Message . update_data ( message , fn data ->
84+ hex = Base . encode16 ( :crypto . strong_rand_bytes ( 64 ) )
85+
86+ String . upcase ( data <> hex )
87+ end )
88+ |> Broadway.Message . put_batcher ( :default )
89+ end
90+
91+ @ impl true
92+ def handle_batch ( :default , messages , _ , _ ) do
93+ Enum . map ( messages , fn message ->
94+ Broadway.Message . update_data ( message , fn data ->
95+ String . downcase ( data )
96+ end )
97+ end )
98+ end
99+ end
100+
62101defmodule FakeProducer do
63102 use GenServer
64103
@@ -74,6 +113,7 @@ defmodule FakeProducer do
74113
75114 def handle_info ( :publish , _timer ) do
76115 for i <- 1 .. 1234 , do: Broadway . test_message ( Demo.Pipeline , "hello #{ i } " )
116+ for i <- 1 .. 100 , do: Broadway . test_message ( Demo.SecondPipeline , "hello #{ i } " )
77117
78118 timer = Process . send_after ( self ( ) , :publish , 100 )
79119
@@ -164,6 +204,7 @@ Task.start(fn ->
164204 children = [
165205 { Phoenix.PubSub , [ name: Demo.PubSub , adapter: Phoenix.PubSub.PG2 ] } ,
166206 Demo.Pipeline ,
207+ Demo.SecondPipeline ,
167208 FakeProducer ,
168209 DemoWeb.Endpoint
169210 ]
0 commit comments