Skip to content

Commit 1abb6a7

Browse files
authored
fix: remove regional_broadcasting feature flag (#1786)
1 parent 289cebf commit 1abb6a7

4 files changed

Lines changed: 57 additions & 72 deletions

File tree

config/runtime.exs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ presence_permdown_period = Env.get_integer("PRESENCE_PERMDOWN_PERIOD_IN_MS", 1_2
8181
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
8282
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
8383
postgres_cdc_scope_shards = Env.get_integer("POSTGRES_CDC_SCOPE_SHARDS", 5)
84-
regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false)
8584
no_channel_timeout_in_ms = Env.get_integer("NO_CHANNEL_TIMEOUT_IN_MS", :timer.minutes(10))
8685
measure_traffic_interval_in_ms = Env.get_integer("MEASURE_TRAFFIC_INTERVAL_IN_MS", :timer.seconds(10))
8786
metrics_pusher_enabled = Env.get_boolean("METRICS_PUSHER_ENABLED", false)
@@ -169,7 +168,6 @@ config :realtime,
169168
presence_permdown_period: presence_permdown_period,
170169
users_scope_shards: users_scope_shards,
171170
postgres_cdc_scope_shards: postgres_cdc_scope_shards,
172-
regional_broadcasting: regional_broadcasting,
173171
master_region: master_region,
174172
region_mapping: region_mapping,
175173
metrics_tags: metrics_tags,

config/test.exs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ config :realtime, RealtimeWeb.Endpoint,
3232
System.put_env("REGION", "us-east-1")
3333

3434
config :realtime,
35-
regional_broadcasting: true,
3635
region: "us-east-1",
3736
db_enc_key: "1234567890123456",
3837
jwt_claim_validators: System.get_env("JWT_CLAIM_VALIDATORS", "{}"),

lib/realtime/gen_rpc/pub_sub.ex

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,20 +48,16 @@ defmodule Realtime.GenRpcPubSub do
4848
def broadcast(adapter_name, topic, message, dispatcher) do
4949
worker = worker_name(adapter_name, self())
5050

51-
if Application.get_env(:realtime, :regional_broadcasting, false) do
52-
my_region = Application.get_env(:realtime, :region)
53-
# broadcast to all other nodes in the region
51+
my_region = Application.get_env(:realtime, :region)
52+
# broadcast to all other nodes in the region
5453

55-
other_nodes = for node <- Realtime.Nodes.region_nodes(my_region), node != node(), do: node
56-
GenRpc.abcast(other_nodes, worker, Worker.forward_to_local(topic, message, dispatcher), key: self())
54+
other_nodes = for node <- Realtime.Nodes.region_nodes(my_region), node != node(), do: node
55+
GenRpc.abcast(other_nodes, worker, Worker.forward_to_local(topic, message, dispatcher), key: self())
5756

58-
# send a message to a node in each region to forward to the rest of the region
59-
other_region_nodes = nodes_from_other_regions(my_region, self())
57+
# send a message to a node in each region to forward to the rest of the region
58+
other_region_nodes = nodes_from_other_regions(my_region, self())
6059

61-
GenRpc.abcast(other_region_nodes, worker, Worker.forward_to_region(topic, message, dispatcher), key: self())
62-
else
63-
GenRpc.abcast(Node.list(), worker, Worker.forward_to_local(topic, message, dispatcher), key: self())
64-
end
60+
GenRpc.abcast(other_region_nodes, worker, Worker.forward_to_region(topic, message, dispatcher), key: self())
6561

6662
:ok
6763
end

test/realtime/gen_rpc_pub_sub_test.exs

Lines changed: 50 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -45,84 +45,76 @@ defmodule Realtime.GenRpcPubSubTest do
4545

4646
@topic "gen-rpc-pub-sub-test-topic"
4747

48-
for regional_broadcasting <- [true, false] do
49-
describe "regional balancing = #{regional_broadcasting}" do
50-
setup do
51-
previous_region = Application.get_env(:realtime, :region)
52-
Application.put_env(:realtime, :region, "us-east-1")
53-
on_exit(fn -> Application.put_env(:realtime, :region, previous_region) end)
48+
describe "regional broadcasting" do
49+
setup do
50+
previous_region = Application.get_env(:realtime, :region)
51+
Application.put_env(:realtime, :region, "us-east-1")
52+
on_exit(fn -> Application.put_env(:realtime, :region, previous_region) end)
5453

55-
previous_regional_broadcast = Application.get_env(:realtime, :regional_broadcasting)
56-
Application.put_env(:realtime, :regional_broadcasting, unquote(regional_broadcasting))
57-
on_exit(fn -> Application.put_env(:realtime, :regional_broadcasting, previous_regional_broadcast) end)
58-
59-
:ok
60-
end
61-
62-
@describetag regional_broadcasting: regional_broadcasting
54+
:ok
55+
end
6356

64-
test "all messages are received" do
65-
# start 1 node in us-east-1 to test my region broadcasting
66-
# start 2 nodes in ap-southeast-2 to test other region broadcasting
57+
test "all messages are received" do
58+
# start 1 node in us-east-1 to test my region broadcasting
59+
# start 2 nodes in ap-southeast-2 to test other region broadcasting
6760

68-
us_node = :us_node
69-
ap2_nodeX = :ap2_nodeX
70-
ap2_nodeY = :ap2_nodeY
61+
us_node = :us_node
62+
ap2_nodeX = :ap2_nodeX
63+
ap2_nodeY = :ap2_nodeY
7164

72-
# Avoid port collision
73-
gen_rpc_port = Application.fetch_env!(:gen_rpc, :tcp_server_port)
65+
# Avoid port collision
66+
gen_rpc_port = Application.fetch_env!(:gen_rpc, :tcp_server_port)
7467

75-
client_config_per_node = %{
76-
node() => gen_rpc_port,
77-
:"#{us_node}@127.0.0.1" => 16970,
78-
:"#{ap2_nodeX}@127.0.0.1" => 16971,
79-
:"#{ap2_nodeY}@127.0.0.1" => 16972
80-
}
68+
client_config_per_node = %{
69+
node() => gen_rpc_port,
70+
:"#{us_node}@127.0.0.1" => 16970,
71+
:"#{ap2_nodeX}@127.0.0.1" => 16971,
72+
:"#{ap2_nodeY}@127.0.0.1" => 16972
73+
}
8174

82-
extra_config = [{:gen_rpc, :client_config_per_node, {:internal, client_config_per_node}}]
75+
extra_config = [{:gen_rpc, :client_config_per_node, {:internal, client_config_per_node}}]
8376

84-
on_exit(fn -> Application.put_env(:gen_rpc, :client_config_per_node, {:internal, %{}}) end)
85-
Application.put_env(:gen_rpc, :client_config_per_node, {:internal, client_config_per_node})
77+
on_exit(fn -> Application.put_env(:gen_rpc, :client_config_per_node, {:internal, %{}}) end)
78+
Application.put_env(:gen_rpc, :client_config_per_node, {:internal, client_config_per_node})
8679

87-
us_extra_config =
88-
[{:realtime, :region, "us-east-1"}, {:gen_rpc, :tcp_server_port, 16970}] ++ extra_config
80+
us_extra_config =
81+
[{:realtime, :region, "us-east-1"}, {:gen_rpc, :tcp_server_port, 16970}] ++ extra_config
8982

90-
{:ok, _} = Clustered.start(@aux_mod, name: us_node, extra_config: us_extra_config, phoenix_port: 4014)
83+
{:ok, _} = Clustered.start(@aux_mod, name: us_node, extra_config: us_extra_config, phoenix_port: 4014)
9184

92-
ap2_nodeX_extra_config =
93-
[{:realtime, :region, "ap-southeast-2"}, {:gen_rpc, :tcp_server_port, 16971}] ++ extra_config
85+
ap2_nodeX_extra_config =
86+
[{:realtime, :region, "ap-southeast-2"}, {:gen_rpc, :tcp_server_port, 16971}] ++ extra_config
9487

95-
{:ok, _} = Clustered.start(@aux_mod, name: ap2_nodeX, extra_config: ap2_nodeX_extra_config, phoenix_port: 4015)
88+
{:ok, _} = Clustered.start(@aux_mod, name: ap2_nodeX, extra_config: ap2_nodeX_extra_config, phoenix_port: 4015)
9689

97-
ap2_nodeY_extra_config =
98-
[{:realtime, :region, "ap-southeast-2"}, {:gen_rpc, :tcp_server_port, 16972}] ++ extra_config
90+
ap2_nodeY_extra_config =
91+
[{:realtime, :region, "ap-southeast-2"}, {:gen_rpc, :tcp_server_port, 16972}] ++ extra_config
9992

100-
{:ok, _} = Clustered.start(@aux_mod, name: ap2_nodeY, extra_config: ap2_nodeY_extra_config, phoenix_port: 4016)
93+
{:ok, _} = Clustered.start(@aux_mod, name: ap2_nodeY, extra_config: ap2_nodeY_extra_config, phoenix_port: 4016)
10194

102-
# Ensuring that syn had enough time to propagate to all nodes the group information
103-
Process.sleep(3000)
95+
# Ensuring that syn had enough time to propagate to all nodes the group information
96+
Process.sleep(3000)
10497

105-
RealtimeWeb.Endpoint.subscribe(@topic)
106-
:erpc.multicall(Node.list(), Subscriber, :subscribe, [self(), @topic])
98+
RealtimeWeb.Endpoint.subscribe(@topic)
99+
:erpc.multicall(Node.list(), Subscriber, :subscribe, [self(), @topic])
107100

108-
assert length(Realtime.Nodes.region_nodes("us-east-1")) == 2
109-
assert length(Realtime.Nodes.region_nodes("ap-southeast-2")) == 2
101+
assert length(Realtime.Nodes.region_nodes("us-east-1")) == 2
102+
assert length(Realtime.Nodes.region_nodes("ap-southeast-2")) == 2
110103

111-
assert_receive {:ready, "us-east-1"}
112-
assert_receive {:ready, "ap-southeast-2"}
113-
assert_receive {:ready, "ap-southeast-2"}
104+
assert_receive {:ready, "us-east-1"}
105+
assert_receive {:ready, "ap-southeast-2"}
106+
assert_receive {:ready, "ap-southeast-2"}
114107

115-
message = %Phoenix.Socket.Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
116-
Phoenix.PubSub.broadcast(Realtime.PubSub, @topic, message)
108+
message = %Phoenix.Socket.Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
109+
Phoenix.PubSub.broadcast(Realtime.PubSub, @topic, message)
117110

118-
assert_receive ^message
111+
assert_receive ^message
119112

120-
# Remote nodes received the broadcast
121-
assert_receive {:relay, :"us_node@127.0.0.1", ^message}, 5000
122-
assert_receive {:relay, :"ap2_nodeX@127.0.0.1", ^message}, 1000
123-
assert_receive {:relay, :"ap2_nodeY@127.0.0.1", ^message}, 1000
124-
refute_receive _any
125-
end
113+
# Remote nodes received the broadcast
114+
assert_receive {:relay, :"us_node@127.0.0.1", ^message}, 5000
115+
assert_receive {:relay, :"ap2_nodeX@127.0.0.1", ^message}, 1000
116+
assert_receive {:relay, :"ap2_nodeY@127.0.0.1", ^message}, 1000
117+
refute_receive _any
126118
end
127119
end
128120
end

0 commit comments

Comments
 (0)