Skip to content

Commit 1c9ab71

Browse files
Rafal StudnickiSimon Zelazny
authored andcommitted
Tracker consists of shard-pool; exposes dirty_list operation
1 parent 1ce6cb8 commit 1c9ab71

11 files changed

Lines changed: 1231 additions & 939 deletions

File tree

lib/phoenix/tracker.ex

Lines changed: 89 additions & 453 deletions
Large diffs are not rendered by default.

lib/phoenix/tracker/shard.ex

Lines changed: 497 additions & 0 deletions
Large diffs are not rendered by default.

lib/phoenix/tracker/state.ex

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ defmodule Phoenix.Tracker.State do
1616
@type context :: %{name => clock}
1717
@type values :: ets_id | :extracted | %{tag => {pid, topic, key, meta}}
1818
@type value :: {{topic, pid, key}, meta, tag}
19+
@type key_meta :: {key, meta}
1920
@type delta :: %State{mode: :delta}
2021
@type pid_lookup :: {pid, topic, key}
2122

@@ -50,13 +51,13 @@ defmodule Phoenix.Tracker.State do
5051
%Phoenix.Tracker.State{...}
5152
5253
"""
53-
@spec new(name) :: t
54-
def new(replica) do
54+
@spec new(name, atom) :: t
55+
def new(replica, shard_name) do
5556
reset_delta(%State{
5657
replica: replica,
5758
context: %{replica => 0},
5859
mode: :normal,
59-
values: :ets.new(:values, [:ordered_set]),
60+
values: :ets.new(shard_name, [:named_table, :protected, :ordered_set]),
6061
pids: :ets.new(:pids, [:duplicate_bag]),
6162
replicas: %{replica => :up}})
6263
end
@@ -113,12 +114,22 @@ defmodule Phoenix.Tracker.State do
113114
@doc """
114115
Returns a list of elements for the topic who belong to an online replica.
115116
"""
116-
@spec get_by_topic(t, topic) :: [value]
117+
@spec get_by_topic(t, topic) :: [key_meta]
117118
def get_by_topic(%State{values: values} = state, topic) do
118-
replicas = down_replicas(state)
119-
:ets.select(values, [{ {{topic, :_, :_}, :_, {:"$1", :_}},
120-
not_in(:"$1", replicas), [:"$_"]}])
119+
tracked_values(values, topic, down_replicas(state))
121120
end
121+
122+
@doc """
123+
Performs table lookup for tracked elements in the topic, filtering out
124+
those present on downed replicas.
125+
"""
126+
def tracked_values(table, topic, down_replicas) do
127+
:ets.select(table,
128+
[{{{topic, :_, :"$1"}, :"$2", {:"$3", :_}},
129+
not_in(:"$3", down_replicas),
130+
[{{:"$1", :"$2"}}]}])
131+
end
132+
122133
defp not_in(_pos, []), do: []
123134
defp not_in(pos, replicas), do: [not: ors(pos, replicas)]
124135
defp ors(pos, [rep]), do: {:"==", pos, {rep}}

test/phoenix/pubsub/pg2_test.exs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ defmodule Phoenix.PubSub.PG2Test do
1111
@node1 :"node1@127.0.0.1"
1212
@node2 :"node2@127.0.0.1"
1313

14+
@receive_timeout 500
15+
1416
setup config do
1517
size = config[:pool_size] || 1
1618
if config[:pool_size] do
@@ -29,15 +31,15 @@ defmodule Phoenix.PubSub.PG2Test do
2931

3032
PubSub.subscribe(config.pubsub, config.topic)
3133
:ok = PubSub.direct_broadcast(@node1, config.pubsub, config.topic, :ping)
32-
assert_receive {@node1, :ping}
34+
assert_receive {@node1, :ping}, @receive_timeout
3335
:ok = PubSub.direct_broadcast!(@node1, config.pubsub, config.topic, :ping)
34-
assert_receive {@node1, :ping}
36+
assert_receive {@node1, :ping}, @receive_timeout
3537

3638
:ok = PubSub.direct_broadcast(@node2, config.pubsub, config.topic, :ping)
37-
refute_receive {@node1, :ping}
39+
refute_receive {@node1, :ping}, @receive_timeout
3840

3941
:ok = PubSub.direct_broadcast!(@node2, config.pubsub, config.topic, :ping)
40-
refute_receive {@node1, :ping}
42+
refute_receive {@node1, :ping}, @receive_timeout
4143
end
4244

4345
@tag pool_size: size, topic: topic
@@ -46,15 +48,15 @@ defmodule Phoenix.PubSub.PG2Test do
4648

4749
PubSub.subscribe(config.pubsub, config.topic)
4850
:ok = PubSub.direct_broadcast_from(@node1, config.pubsub, self(), config.topic, :ping)
49-
assert_receive {@node1, :ping}
51+
assert_receive {@node1, :ping}, @receive_timeout
5052
:ok = PubSub.direct_broadcast_from!(@node1, config.pubsub, self(), config.topic, :ping)
51-
assert_receive {@node1, :ping}
53+
assert_receive {@node1, :ping}, @receive_timeout
5254

5355
:ok = PubSub.direct_broadcast_from(@node2, config.pubsub, self(), config.topic, :ping)
54-
refute_receive {@node1, :ping}
56+
refute_receive {@node1, :ping}, @receive_timeout
5557

5658
:ok = PubSub.direct_broadcast_from!(@node2, config.pubsub, self(), config.topic, :ping)
57-
refute_receive {@node1, :ping}
59+
refute_receive {@node1, :ping}, @receive_timeout
5860
end
5961
end
6062

test/phoenix/tracker/delta_generation_test.exs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
99
|> Enum.sort()
1010
end
1111

12-
defp new(node) do
13-
State.new(node)
12+
defp new(node, config) do
13+
State.new(node, :"#{node} #{config.test}")
1414
end
1515

1616
defp new_pid() do
@@ -23,9 +23,9 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
2323
|> Enum.sort()
2424
end
2525

26-
test "generations" do
27-
s1 = new(:s1)
28-
s2 = new(:s2)
26+
test "generations", config do
27+
s1 = new(:s1, config)
28+
s2 = new(:s2, config)
2929
s1 = State.join(s1, new_pid(), "lobby", "user1", %{})
3030
assert [gen1, gen1, gen1] = gens = push(s1, [], s1.delta, [2, 5, 6])
3131
assert keys(gen1) == ["user1"]
@@ -86,9 +86,9 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
8686
assert sorted_clouds(gen3.clouds) == [{:s1, 3}, {:s1, 4}, {:s2, 1}, {:s2, 2}]
8787
end
8888

89-
test "does not include non-contiguous deltas" do
90-
s1 = new(:s1)
91-
s3 = new(:s3)
89+
test "does not include non-contiguous deltas", config do
90+
s1 = new(:s1, config)
91+
s3 = new(:s3, config)
9292
s1 = State.join(s1, new_pid(), "lobby", "user1", %{})
9393
old_s3 = s3 = State.join(s3, new_pid(), "lobby", "user3", %{})
9494
s3 = State.reset_delta(s3)
@@ -99,10 +99,10 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do
9999
assert [^gen1, ^gen1, ^gen1] = push(s1, gens, s3.delta, [5, 10, 15])
100100
end
101101

102-
test "remove_down_replicas" do
103-
s1 = new(:s1)
104-
s2 = new(:s2)
105-
s3 = new(:s3)
102+
test "remove_down_replicas", config do
103+
s1 = new(:s1, config)
104+
s2 = new(:s2, config)
105+
s3 = new(:s3, config)
106106
s2 = State.join(s2, new_pid(), "lobby", "user2", %{})
107107
assert [gen1, gen1, gen1] = gens = push(s1, [], s2.delta, [5, 10, 15])
108108
assert [pruned_gen1, pruned_gen1, pruned_gen1] = DeltaGeneration.remove_down_replicas(gens, :s2)

0 commit comments

Comments
 (0)