Skip to content

Commit 228b82e

Browse files
authored
Add configurable default dispatcher (#209)
Adds a :dispatcher option to child_spec/start_link that sets the default dispatcher module for all broadcast functions. Defaults to Phoenix.PubSub (preserving existing behavior). Can be overridden per-call by passing a dispatcher to broadcast/4 and friends. {Phoenix.PubSub, name: :my_pubsub, dispatcher: MyApp.Dispatcher} The internal meta :pubsub value is extended from {adapter, name} to {adapter, name, dispatcher}. Single Registry.meta lookup per broadcast call, same as before.
1 parent 0b63dce commit 228b82e

3 files changed

Lines changed: 106 additions & 17 deletions

File tree

lib/phoenix/pubsub.ex

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ defmodule Phoenix.PubSub do
171171
* `:broadcast_pool_size` - number of pubsub partitions used for broadcasting messages
172172
(defaults to `:pool_size`). This option is used during pool size migrations to ensure
173173
no messages are lost. See the "Safe Pool Size Migration" section in the module documentation.
174+
* `:dispatcher` - the default dispatcher module for broadcasts
175+
(defaults to `Phoenix.PubSub`). Can be overridden per-call by
176+
passing a dispatcher to `broadcast/4` and friends.
174177
175178
"""
176179
@spec child_spec(keyword) :: Supervisor.child_spec()
@@ -249,9 +252,10 @@ defmodule Phoenix.PubSub do
249252
See the "Custom dispatching" section in the module documentation.
250253
"""
251254
@spec broadcast(t, topic, message, dispatcher) :: :ok | {:error, term}
252-
def broadcast(pubsub, topic, message, dispatcher \\ __MODULE__)
255+
def broadcast(pubsub, topic, message, dispatcher \\ nil)
253256
when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do
254-
{:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub)
257+
{:ok, {adapter, name, default_dispatcher}} = Registry.meta(pubsub, :pubsub)
258+
dispatcher = dispatcher || default_dispatcher
255259

256260
with :ok <- adapter.broadcast(name, topic, message, dispatcher) do
257261
dispatch(pubsub, :none, topic, message, dispatcher)
@@ -273,9 +277,10 @@ defmodule Phoenix.PubSub do
273277
See the "Custom dispatching" section in the module documentation.
274278
"""
275279
@spec broadcast_from(t, pid, topic, message, dispatcher) :: :ok | {:error, term}
276-
def broadcast_from(pubsub, from, topic, message, dispatcher \\ __MODULE__)
280+
def broadcast_from(pubsub, from, topic, message, dispatcher \\ nil)
277281
when is_atom(pubsub) and is_pid(from) and is_binary(topic) and is_atom(dispatcher) do
278-
{:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub)
282+
{:ok, {adapter, name, default_dispatcher}} = Registry.meta(pubsub, :pubsub)
283+
dispatcher = dispatcher || default_dispatcher
279284

280285
with :ok <- adapter.broadcast(name, topic, message, dispatcher) do
281286
dispatch(pubsub, from, topic, message, dispatcher)
@@ -293,9 +298,10 @@ defmodule Phoenix.PubSub do
293298
See the "Custom dispatching" section in the module documentation.
294299
"""
295300
@spec local_broadcast(t, topic, message, dispatcher) :: :ok
296-
def local_broadcast(pubsub, topic, message, dispatcher \\ __MODULE__)
301+
def local_broadcast(pubsub, topic, message, dispatcher \\ nil)
297302
when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do
298-
dispatch(pubsub, :none, topic, message, dispatcher)
303+
{:ok, {_adapter, _name, default_dispatcher}} = Registry.meta(pubsub, :pubsub)
304+
dispatch(pubsub, :none, topic, message, dispatcher || default_dispatcher)
299305
end
300306

301307
@doc """
@@ -313,9 +319,10 @@ defmodule Phoenix.PubSub do
313319
See the "Custom dispatching" section in the module documentation.
314320
"""
315321
@spec local_broadcast_from(t, pid, topic, message, dispatcher) :: :ok
316-
def local_broadcast_from(pubsub, from, topic, message, dispatcher \\ __MODULE__)
322+
def local_broadcast_from(pubsub, from, topic, message, dispatcher \\ nil)
317323
when is_atom(pubsub) and is_pid(from) and is_binary(topic) and is_atom(dispatcher) do
318-
dispatch(pubsub, from, topic, message, dispatcher)
324+
{:ok, {_adapter, _name, default_dispatcher}} = Registry.meta(pubsub, :pubsub)
325+
dispatch(pubsub, from, topic, message, dispatcher || default_dispatcher)
319326
end
320327

321328
@doc """
@@ -333,17 +340,17 @@ defmodule Phoenix.PubSub do
333340
See the "Custom dispatching" section in the module documentation.
334341
"""
335342
@spec direct_broadcast(node_name, t, topic, message, dispatcher) :: :ok | {:error, term}
336-
def direct_broadcast(node_name, pubsub, topic, message, dispatcher \\ __MODULE__)
343+
def direct_broadcast(node_name, pubsub, topic, message, dispatcher \\ nil)
337344
when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do
338-
{:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub)
339-
adapter.direct_broadcast(name, node_name, topic, message, dispatcher)
345+
{:ok, {adapter, name, default_dispatcher}} = Registry.meta(pubsub, :pubsub)
346+
adapter.direct_broadcast(name, node_name, topic, message, dispatcher || default_dispatcher)
340347
end
341348

342349
@doc """
343350
Raising version of `broadcast/4`.
344351
"""
345352
@spec broadcast!(t, topic, message, dispatcher) :: :ok
346-
def broadcast!(pubsub, topic, message, dispatcher \\ __MODULE__) do
353+
def broadcast!(pubsub, topic, message, dispatcher \\ nil) do
347354
case broadcast(pubsub, topic, message, dispatcher) do
348355
:ok -> :ok
349356
{:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}"
@@ -354,7 +361,7 @@ defmodule Phoenix.PubSub do
354361
Raising version of `broadcast_from/5`.
355362
"""
356363
@spec broadcast_from!(t, pid, topic, message, dispatcher) :: :ok
357-
def broadcast_from!(pubsub, from, topic, message, dispatcher \\ __MODULE__) do
364+
def broadcast_from!(pubsub, from, topic, message, dispatcher \\ nil) do
358365
case broadcast_from(pubsub, from, topic, message, dispatcher) do
359366
:ok -> :ok
360367
{:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}"
@@ -365,7 +372,7 @@ defmodule Phoenix.PubSub do
365372
Raising version of `direct_broadcast/5`.
366373
"""
367374
@spec direct_broadcast!(node_name, t, topic, message, dispatcher) :: :ok
368-
def direct_broadcast!(node_name, pubsub, topic, message, dispatcher \\ __MODULE__) do
375+
def direct_broadcast!(node_name, pubsub, topic, message, dispatcher \\ nil) do
369376
case direct_broadcast(node_name, pubsub, topic, message, dispatcher) do
370377
:ok -> :ok
371378
{:error, error} -> raise BroadcastError, "broadcast failed: #{inspect(error)}"
@@ -377,7 +384,7 @@ defmodule Phoenix.PubSub do
377384
"""
378385
@spec node_name(t) :: node_name
379386
def node_name(pubsub) do
380-
{:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub)
387+
{:ok, {adapter, name, _dispatcher}} = Registry.meta(pubsub, :pubsub)
381388
adapter.node_name(name)
382389
end
383390

lib/phoenix/pubsub/supervisor.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ defmodule Phoenix.PubSub.Supervisor do
2525
opts[:registry_size] || opts[:pool_size] ||
2626
System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc()
2727

28+
dispatcher = Keyword.get(opts, :dispatcher, Phoenix.PubSub)
29+
2830
registry = [
29-
meta: [pubsub: {adapter, adapter_name}],
31+
meta: [pubsub: {adapter, adapter_name, dispatcher}],
3032
partitions: partitions,
3133
keys: :duplicate,
3234
name: name

test/phoenix/pubsub_test.exs

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
defmodule Phoenix.PubSub.UnitTest do
22
use ExUnit.Case, async: true
33

4+
alias Phoenix.PubSub
5+
46
describe "child_spec/1" do
57
test "expects a name" do
6-
{:error, {{:EXIT, {exception, _}}, _}} = start_supervised({Phoenix.PubSub, []})
8+
{:error, {{:EXIT, {exception, _}}, _}} = start_supervised({PubSub, []})
79

810
assert Exception.message(exception) ==
911
"the :name option is required when starting Phoenix.PubSub"
@@ -22,4 +24,82 @@ defmodule Phoenix.PubSub.UnitTest do
2224
:"#{__MODULE__}_#{:crypto.strong_rand_bytes(8) |> Base.encode16()}"
2325
end
2426
end
27+
28+
describe "default dispatcher" do
29+
defmodule TestDispatcher do
30+
def dispatch(entries, :none, message) do
31+
for {pid, _} <- entries do
32+
send(pid, {:custom_dispatched, message})
33+
end
34+
35+
:ok
36+
end
37+
38+
def dispatch(entries, from, message) do
39+
for {pid, _} <- entries, pid != from do
40+
send(pid, {:custom_dispatched, message})
41+
end
42+
43+
:ok
44+
end
45+
end
46+
47+
test "defaults to Phoenix.PubSub when no dispatcher configured" do
48+
name = :"ps_default_#{:erlang.unique_integer([:positive])}"
49+
start_supervised!({PubSub, name: name})
50+
51+
PubSub.subscribe(name, "topic")
52+
PubSub.broadcast(name, "topic", :hello)
53+
assert_receive :hello
54+
end
55+
56+
test "uses configured dispatcher for broadcast/3" do
57+
name = :"ps_custom_#{:erlang.unique_integer([:positive])}"
58+
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})
59+
60+
PubSub.subscribe(name, "topic")
61+
PubSub.broadcast(name, "topic", :hello)
62+
assert_receive {:custom_dispatched, :hello}
63+
refute_received :hello
64+
end
65+
66+
test "uses configured dispatcher for local_broadcast/3" do
67+
name = :"ps_local_#{:erlang.unique_integer([:positive])}"
68+
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})
69+
70+
PubSub.subscribe(name, "topic")
71+
PubSub.local_broadcast(name, "topic", :hello)
72+
assert_receive {:custom_dispatched, :hello}
73+
end
74+
75+
test "uses configured dispatcher for broadcast_from/4" do
76+
name = :"ps_from_#{:erlang.unique_integer([:positive])}"
77+
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})
78+
79+
PubSub.subscribe(name, "topic")
80+
other = spawn(fn -> Process.sleep(:infinity) end)
81+
PubSub.broadcast_from(name, other, "topic", :hello)
82+
assert_receive {:custom_dispatched, :hello}
83+
end
84+
85+
test "explicit dispatcher overrides the configured default" do
86+
name = :"ps_override_#{:erlang.unique_integer([:positive])}"
87+
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})
88+
89+
PubSub.subscribe(name, "topic")
90+
# Pass Phoenix.PubSub explicitly to override the configured TestDispatcher
91+
PubSub.broadcast(name, "topic", :hello, PubSub)
92+
assert_receive :hello
93+
refute_received {:custom_dispatched, :hello}
94+
end
95+
96+
test "bang variants use configured dispatcher" do
97+
name = :"ps_bang_#{:erlang.unique_integer([:positive])}"
98+
start_supervised!({PubSub, name: name, dispatcher: TestDispatcher})
99+
100+
PubSub.subscribe(name, "topic")
101+
PubSub.broadcast!(name, "topic", :hello)
102+
assert_receive {:custom_dispatched, :hello}
103+
end
104+
end
25105
end

0 commit comments

Comments
 (0)