Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ defmodule Realtime.Application do

:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])

:ets.new(:log_rate_limiter, [
:set,
:public,
:named_table,
{:decentralized_counters, true},
{:write_concurrency, :auto}
])

set_persist_storage(RealtimeWeb.UserSocket, :realtime, :websocket_max_heap_size)
set_persist_storage(RealtimeWeb.UserSocket, :realtime, :measure_traffic_interval_in_ms)

Expand Down
4 changes: 2 additions & 2 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ defmodule RealtimeWeb.RealtimeChannel do

{:error, :too_many_connections} ->
msg = "Too many connected users"
log_error(socket, "ConnectionRateLimitReached", msg)
log_error(socket, "ConnectionRateLimitReached", msg, max: 1, window_ms: 60_000)

{:error, :too_many_joins} ->
msg = "ClientJoinRateLimitReached: Too many joins per second"
Expand Down Expand Up @@ -216,7 +216,7 @@ defmodule RealtimeWeb.RealtimeChannel do
log_error(socket, "TenantNotFound", "Tenant with the given ID does not exist")

{:error, :tenant_suspended} ->
log_error(socket, "RealtimeDisabledForTenant", "Realtime disabled for this tenant")
log_error(socket, "RealtimeDisabledForTenant", "Realtime disabled for this tenant", max: 1, window_ms: 60_000)

{:error, :signature_error} ->
log_error(socket, "JwtSignatureError", "Failed to validate JWT signature")
Expand Down
62 changes: 46 additions & 16 deletions lib/realtime_web/channels/realtime_channel/logging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
alias Realtime.Telemetry
require Logger

@log_rate_table :log_rate_limiter
@log_rate_default_max 10
@log_rate_default_window_ms 300_000

defmacro __using__(_opts) do
quote do
require Logger
Expand All @@ -18,60 +22,86 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
"""
@spec log_error(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any()) ::
{:error, %{reason: binary}}
def log_error(socket, code, msg) do
def log_error(socket, code, msg, opts \\ []) do
msg = build_msg(code, msg)
emit_system_error(:error, code)
log(socket, :error, code, msg)
log(socket, :error, code, msg, opts)
{:error, %{reason: msg}}
end

@doc """
Logs a warning message
"""
@spec log_warning(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any()) ::
@spec log_warning(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any(), opts :: keyword()) ::
{:error, %{reason: binary}}
def log_warning(socket, code, msg) do
def log_warning(socket, code, msg, opts \\ []) do
msg = build_msg(code, msg)
log(socket, :warning, code, msg)
log(socket, :warning, code, msg, opts)
{:error, %{reason: msg}}
end

@doc """
Logs an error if the log level is set to error
"""
@spec maybe_log_error(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any()) :: {:error, %{reason: binary}}
def maybe_log_error(socket, code, msg), do: maybe_log(socket, :error, code, msg)
@spec maybe_log_error(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any(), opts :: keyword()) ::
{:error, %{reason: binary}}
def maybe_log_error(socket, code, msg, opts \\ []), do: maybe_log(socket, :error, code, msg, opts)

@doc """
Logs a warning if the log level is set to warning
"""
@spec maybe_log_warning(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any()) :: {:error, %{reason: binary}}
def maybe_log_warning(socket, code, msg), do: maybe_log(socket, :warning, code, msg)
@spec maybe_log_warning(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any(), opts :: keyword()) ::
{:error, %{reason: binary}}
def maybe_log_warning(socket, code, msg, opts \\ []), do: maybe_log(socket, :warning, code, msg, opts)

@doc """
Logs an info if the log level is set to info
"""
@spec maybe_log_info(socket :: Phoenix.Socket.t(), msg :: any()) :: :ok
def maybe_log_info(socket, msg), do: maybe_log(socket, :info, nil, msg)
def maybe_log_info(socket, msg), do: maybe_log(socket, :info, nil, msg, [])

defp build_msg(code, msg) do
msg = stringify!(msg)
if code, do: "#{code}: #{msg}", else: msg
end

defp log(%{assigns: %{tenant: tenant, access_token: access_token}}, level, code, msg) do
Logger.metadata(external_id: tenant, project: tenant)
if level in [:error, :warning], do: update_metadata_with_token_claims(access_token)
Logger.log(level, msg, error_code: code)
defp log(%{assigns: %{tenant: tenant, access_token: access_token}}, level, code, msg, opts) do
unless rate_limited?(tenant, code, opts) do
Logger.metadata(external_id: tenant, project: tenant)
if level in [:error, :warning], do: update_metadata_with_token_claims(access_token)
Logger.log(level, msg, error_code: code)
end
end

defp maybe_log(%{assigns: %{log_level: log_level}} = socket, level, code, msg) do
defp maybe_log(%{assigns: %{log_level: log_level}} = socket, level, code, msg, opts) do
msg = build_msg(code, msg)
emit_system_error(level, code)
if Logger.compare_levels(log_level, level) != :gt, do: log(socket, level, code, msg)
if Logger.compare_levels(log_level, level) != :gt, do: log(socket, level, code, msg, opts)
if level in [:error, :warning], do: {:error, %{reason: msg}}, else: :ok
end

defp rate_limited?(_tenant, nil, _opts), do: false

defp rate_limited?(tenant, code, opts) do
max = Keyword.get(opts, :max, @log_rate_default_max)
window_ms = Keyword.get(opts, :window_ms, @log_rate_default_window_ms)
key = {tenant, code}
now = :erlang.monotonic_time(:millisecond)
count = :ets.update_counter(@log_rate_table, key, {2, 1}, {key, 0, now})

case :ets.lookup(@log_rate_table, key) do
[{^key, ^count, window_start}] when now - window_start >= window_ms ->
:ets.insert(@log_rate_table, {key, 1, now})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a row per tenant, code, 300,000 milliseconds? With N tenants and N codes how many rows with this have after a day? Are we cleaning it anywhere?

false

[{^key, ^count, _window_start}] ->
count > max

[] ->
false
end
end

@system_errors [
"UnableToSetPolicies",
"InitializingProjectConnection",
Expand Down
8 changes: 6 additions & 2 deletions lib/realtime_web/channels/user_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ defmodule RealtimeWeb.UserSocket do
{:error, :tenant_not_found}

%Tenant{suspend: true} ->
Logging.log_error(socket, "RealtimeDisabledForTenant", "Realtime disabled for this tenant")
Logging.log_error(socket, "RealtimeDisabledForTenant", "Realtime disabled for this tenant",
max: 1,
window_ms: 60_000
)

{:error, :tenant_suspended}

{:error, :expired_token, msg} ->
Expand All @@ -126,7 +130,7 @@ defmodule RealtimeWeb.UserSocket do

{:error, :too_many_connections} ->
msg = "Too many connected users"
Logging.log_error(socket, "ConnectionRateLimitReached", msg)
Logging.log_error(socket, "ConnectionRateLimitReached", msg, max: 1, window_ms: 60_000)
{:error, :too_many_connections}

{:error, :too_many_joins} ->
Expand Down
56 changes: 56 additions & 0 deletions test/realtime_web/channels/realtime_channel/logging_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,60 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
log = capture_log(fn -> Logging.maybe_log_error(socket, "TestError", "test error") end)
assert log =~ tenant_id
end

describe "rate limiting" do
test "log_error stops logging after max is exceeded for a tenant+code" do
tenant_id = random_string()
socket = %{assigns: %{log_level: :error, tenant: tenant_id, access_token: "test_token"}}

logs =
capture_log(fn ->
for _ <- 1..5, do: Logging.log_error(socket, "RateLimitCode", "msg", max: 3, window_ms: 60_000)
end)

assert logs |> String.split("RateLimitCode: msg") |> length() == 4
end

test "different codes for same tenant have independent limits" do
tenant_id = random_string()
socket = %{assigns: %{log_level: :error, tenant: tenant_id, access_token: "test_token"}}

logs =
capture_log(fn ->
for _ <- 1..2, do: Logging.log_error(socket, "CodeA", "msg", max: 1, window_ms: 60_000)
for _ <- 1..2, do: Logging.log_error(socket, "CodeB", "msg", max: 1, window_ms: 60_000)
end)

assert logs |> String.split("CodeA: msg") |> length() == 2
assert logs |> String.split("CodeB: msg") |> length() == 2
end

test "after window expires, logging resumes" do
tenant_id = random_string()
socket = %{assigns: %{log_level: :error, tenant: tenant_id, access_token: "test_token"}}
key = {tenant_id, "ExpiryCode"}

capture_log(fn ->
for _ <- 1..3, do: Logging.log_error(socket, "ExpiryCode", "msg", max: 2, window_ms: 60_000)
end)

stale_window_start = :erlang.monotonic_time(:millisecond) - 120_000
:ets.insert(:log_rate_limiter, {key, 5, stale_window_start})

log = capture_log(fn -> Logging.log_error(socket, "ExpiryCode", "msg", max: 2, window_ms: 60_000) end)
assert log =~ "ExpiryCode: msg"
end

test "maybe_log_info is never rate limited" do
tenant_id = random_string()
socket = %{assigns: %{log_level: :info, tenant: tenant_id, access_token: "test_token"}}

logs =
capture_log(fn ->
for _ <- 1..5, do: Logging.maybe_log_info(socket, "info message")
end)

assert logs |> String.split("info message") |> length() == 6
end
end
end
Loading