Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ presence_permdown_period = Env.get_integer("PRESENCE_PERMDOWN_PERIOD_IN_MS", 1_2
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
postgres_cdc_scope_shards = Env.get_integer("POSTGRES_CDC_SCOPE_SHARDS", 5)
regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false)
no_channel_timeout_in_ms = Env.get_integer("NO_CHANNEL_TIMEOUT_IN_MS", :timer.minutes(10))
measure_traffic_interval_in_ms = Env.get_integer("MEASURE_TRAFFIC_INTERVAL_IN_MS", :timer.seconds(10))
metrics_pusher_enabled = Env.get_boolean("METRICS_PUSHER_ENABLED", false)
Expand Down Expand Up @@ -169,7 +168,6 @@ config :realtime,
presence_permdown_period: presence_permdown_period,
users_scope_shards: users_scope_shards,
postgres_cdc_scope_shards: postgres_cdc_scope_shards,
regional_broadcasting: regional_broadcasting,
master_region: master_region,
region_mapping: region_mapping,
metrics_tags: metrics_tags,
Expand Down
1 change: 0 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ config :realtime, RealtimeWeb.Endpoint,
System.put_env("REGION", "us-east-1")

config :realtime,
regional_broadcasting: true,
region: "us-east-1",
db_enc_key: "1234567890123456",
jwt_claim_validators: System.get_env("JWT_CLAIM_VALIDATORS", "{}"),
Expand Down
18 changes: 7 additions & 11 deletions lib/realtime/gen_rpc/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,16 @@ defmodule Realtime.GenRpcPubSub do
def broadcast(adapter_name, topic, message, dispatcher) do
worker = worker_name(adapter_name, self())

if Application.get_env(:realtime, :regional_broadcasting, false) do
my_region = Application.get_env(:realtime, :region)
# broadcast to all other nodes in the region
my_region = Application.get_env(:realtime, :region)
# broadcast to all other nodes in the region

other_nodes = for node <- Realtime.Nodes.region_nodes(my_region), node != node(), do: node
GenRpc.abcast(other_nodes, worker, Worker.forward_to_local(topic, message, dispatcher), key: self())
other_nodes = for node <- Realtime.Nodes.region_nodes(my_region), node != node(), do: node
GenRpc.abcast(other_nodes, worker, Worker.forward_to_local(topic, message, dispatcher), key: self())

# send a message to a node in each region to forward to the rest of the region
other_region_nodes = nodes_from_other_regions(my_region, self())
# send a message to a node in each region to forward to the rest of the region
other_region_nodes = nodes_from_other_regions(my_region, self())

GenRpc.abcast(other_region_nodes, worker, Worker.forward_to_region(topic, message, dispatcher), key: self())
else
GenRpc.abcast(Node.list(), worker, Worker.forward_to_local(topic, message, dispatcher), key: self())
end
GenRpc.abcast(other_region_nodes, worker, Worker.forward_to_region(topic, message, dispatcher), key: self())

:ok
end
Expand Down
108 changes: 50 additions & 58 deletions test/realtime/gen_rpc_pub_sub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,84 +45,76 @@ defmodule Realtime.GenRpcPubSubTest do

@topic "gen-rpc-pub-sub-test-topic"

for regional_broadcasting <- [true, false] do
describe "regional balancing = #{regional_broadcasting}" do
setup do
previous_region = Application.get_env(:realtime, :region)
Application.put_env(:realtime, :region, "us-east-1")
on_exit(fn -> Application.put_env(:realtime, :region, previous_region) end)
describe "regional broadcasting" do
setup do
previous_region = Application.get_env(:realtime, :region)
Application.put_env(:realtime, :region, "us-east-1")
on_exit(fn -> Application.put_env(:realtime, :region, previous_region) end)

previous_regional_broadcast = Application.get_env(:realtime, :regional_broadcasting)
Application.put_env(:realtime, :regional_broadcasting, unquote(regional_broadcasting))
on_exit(fn -> Application.put_env(:realtime, :regional_broadcasting, previous_regional_broadcast) end)

:ok
end

@describetag regional_broadcasting: regional_broadcasting
:ok
end

test "all messages are received" do
# start 1 node in us-east-1 to test my region broadcasting
# start 2 nodes in ap-southeast-2 to test other region broadcasting
test "all messages are received" do
# start 1 node in us-east-1 to test my region broadcasting
# start 2 nodes in ap-southeast-2 to test other region broadcasting

us_node = :us_node
ap2_nodeX = :ap2_nodeX
ap2_nodeY = :ap2_nodeY
us_node = :us_node
ap2_nodeX = :ap2_nodeX
ap2_nodeY = :ap2_nodeY

# Avoid port collision
gen_rpc_port = Application.fetch_env!(:gen_rpc, :tcp_server_port)
# Avoid port collision
gen_rpc_port = Application.fetch_env!(:gen_rpc, :tcp_server_port)

client_config_per_node = %{
node() => gen_rpc_port,
:"#{us_node}@127.0.0.1" => 16970,
:"#{ap2_nodeX}@127.0.0.1" => 16971,
:"#{ap2_nodeY}@127.0.0.1" => 16972
}
client_config_per_node = %{
node() => gen_rpc_port,
:"#{us_node}@127.0.0.1" => 16970,
:"#{ap2_nodeX}@127.0.0.1" => 16971,
:"#{ap2_nodeY}@127.0.0.1" => 16972
}

extra_config = [{:gen_rpc, :client_config_per_node, {:internal, client_config_per_node}}]
extra_config = [{:gen_rpc, :client_config_per_node, {:internal, client_config_per_node}}]

on_exit(fn -> Application.put_env(:gen_rpc, :client_config_per_node, {:internal, %{}}) end)
Application.put_env(:gen_rpc, :client_config_per_node, {:internal, client_config_per_node})
on_exit(fn -> Application.put_env(:gen_rpc, :client_config_per_node, {:internal, %{}}) end)
Application.put_env(:gen_rpc, :client_config_per_node, {:internal, client_config_per_node})

us_extra_config =
[{:realtime, :region, "us-east-1"}, {:gen_rpc, :tcp_server_port, 16970}] ++ extra_config
us_extra_config =
[{:realtime, :region, "us-east-1"}, {:gen_rpc, :tcp_server_port, 16970}] ++ extra_config

{:ok, _} = Clustered.start(@aux_mod, name: us_node, extra_config: us_extra_config, phoenix_port: 4014)
{:ok, _} = Clustered.start(@aux_mod, name: us_node, extra_config: us_extra_config, phoenix_port: 4014)

ap2_nodeX_extra_config =
[{:realtime, :region, "ap-southeast-2"}, {:gen_rpc, :tcp_server_port, 16971}] ++ extra_config
ap2_nodeX_extra_config =
[{:realtime, :region, "ap-southeast-2"}, {:gen_rpc, :tcp_server_port, 16971}] ++ extra_config

{:ok, _} = Clustered.start(@aux_mod, name: ap2_nodeX, extra_config: ap2_nodeX_extra_config, phoenix_port: 4015)
{:ok, _} = Clustered.start(@aux_mod, name: ap2_nodeX, extra_config: ap2_nodeX_extra_config, phoenix_port: 4015)

ap2_nodeY_extra_config =
[{:realtime, :region, "ap-southeast-2"}, {:gen_rpc, :tcp_server_port, 16972}] ++ extra_config
ap2_nodeY_extra_config =
[{:realtime, :region, "ap-southeast-2"}, {:gen_rpc, :tcp_server_port, 16972}] ++ extra_config

{:ok, _} = Clustered.start(@aux_mod, name: ap2_nodeY, extra_config: ap2_nodeY_extra_config, phoenix_port: 4016)
{:ok, _} = Clustered.start(@aux_mod, name: ap2_nodeY, extra_config: ap2_nodeY_extra_config, phoenix_port: 4016)

# Ensuring that syn had enough time to propagate to all nodes the group information
Process.sleep(3000)
# Ensuring that syn had enough time to propagate to all nodes the group information
Process.sleep(3000)

RealtimeWeb.Endpoint.subscribe(@topic)
:erpc.multicall(Node.list(), Subscriber, :subscribe, [self(), @topic])
RealtimeWeb.Endpoint.subscribe(@topic)
:erpc.multicall(Node.list(), Subscriber, :subscribe, [self(), @topic])

assert length(Realtime.Nodes.region_nodes("us-east-1")) == 2
assert length(Realtime.Nodes.region_nodes("ap-southeast-2")) == 2
assert length(Realtime.Nodes.region_nodes("us-east-1")) == 2
assert length(Realtime.Nodes.region_nodes("ap-southeast-2")) == 2

assert_receive {:ready, "us-east-1"}
assert_receive {:ready, "ap-southeast-2"}
assert_receive {:ready, "ap-southeast-2"}
assert_receive {:ready, "us-east-1"}
assert_receive {:ready, "ap-southeast-2"}
assert_receive {:ready, "ap-southeast-2"}

message = %Phoenix.Socket.Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
Phoenix.PubSub.broadcast(Realtime.PubSub, @topic, message)
message = %Phoenix.Socket.Broadcast{topic: @topic, event: "an event", payload: ["a", %{"b" => "c"}, 1, 23]}
Phoenix.PubSub.broadcast(Realtime.PubSub, @topic, message)

assert_receive ^message
assert_receive ^message

# Remote nodes received the broadcast
assert_receive {:relay, :"us_node@127.0.0.1", ^message}, 5000
assert_receive {:relay, :"ap2_nodeX@127.0.0.1", ^message}, 1000
assert_receive {:relay, :"ap2_nodeY@127.0.0.1", ^message}, 1000
refute_receive _any
end
# Remote nodes received the broadcast
assert_receive {:relay, :"us_node@127.0.0.1", ^message}, 5000
assert_receive {:relay, :"ap2_nodeX@127.0.0.1", ^message}, 1000
assert_receive {:relay, :"ap2_nodeY@127.0.0.1", ^message}, 1000
refute_receive _any
end
end
end
Loading