Skip to content

Commit 369acc9

Browse files
authored
fix: log rate limit (#1793)
1 parent 814d474 commit 369acc9

4 files changed

Lines changed: 158 additions & 35 deletions

File tree

config/runtime.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ metrics_pusher_auth = System.get_env("METRICS_PUSHER_AUTH")
9191
metrics_pusher_interval_ms = Env.get_integer("METRICS_PUSHER_INTERVAL_MS", :timer.seconds(30))
9292
metrics_pusher_timeout_ms = Env.get_integer("METRICS_PUSHER_TIMEOUT_MS", :timer.seconds(15))
9393
metrics_pusher_compress = Env.get_boolean("METRICS_PUSHER_COMPRESS", true)
94+
log_throttle_janitor_interval_ms = Env.get_integer("LOG_THROTTLE_JANITOR_INTERVAL_IN_MS", :timer.minutes(10))
9495

9596
metrics_pusher_extra_labels =
9697
case System.get_env("METRICS_PUSHER_EXTRA_LABELS", "") do
@@ -177,6 +178,7 @@ config :realtime,
177178
max_calls: client_presence_max_calls,
178179
window_ms: client_presence_window_ms
179180
],
181+
log_throttle_janitor_interval_ms: log_throttle_janitor_interval_ms,
180182
disable_healthcheck_logging: disable_healthcheck_logging,
181183
metrics_pusher_enabled: metrics_pusher_enabled,
182184
metrics_pusher_url: metrics_pusher_url,

lib/realtime/application.ex

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ defmodule Realtime.Application do
44
@moduledoc false
55

66
use Application
7+
require Cachex.Spec
78
require Logger
89

910
alias Realtime.Repo.Replica
@@ -106,7 +107,16 @@ defmodule Realtime.Application do
106107
message_module: Realtime.BeaconPubSubAdapter
107108
]
108109
]},
109-
{Cachex, name: Realtime.RateCounter},
110+
Supervisor.child_spec({Cachex, name: Realtime.RateCounter}, id: Realtime.RateCounter),
111+
Supervisor.child_spec(
112+
{Cachex,
113+
name: Realtime.LogThrottle,
114+
expiration:
115+
Cachex.Spec.expiration(
116+
interval: Application.get_env(:realtime, :log_throttle_janitor_interval_ms, :timer.minutes(10))
117+
)},
118+
id: Realtime.LogThrottle
119+
),
110120
Realtime.Tenants.Cache,
111121
Realtime.RateCounter.DynamicSupervisor,
112122
Realtime.Latency,

lib/realtime_web/channels/realtime_channel/logging.ex

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -36,59 +36,86 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
3636
end
3737

3838
@doc """
39-
Logs an error if the log level is set to error
39+
Logs an error if the log level is set to error.
40+
41+
Accepts an optional `throttle: {max_count, window_ms}` option to limit
42+
how many times the log is emitted per tenant+code within the given time window.
4043
"""
41-
@spec maybe_log_error(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any()) :: {:error, %{reason: binary}}
42-
def maybe_log_error(socket, code, msg), do: maybe_log(socket, :error, code, msg)
44+
@spec maybe_log_error(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any(), opts :: keyword()) ::
45+
{:error, %{reason: binary}}
46+
def maybe_log_error(socket, code, msg, opts \\ []), do: maybe_log(socket, :error, code, msg, opts)
4347

4448
@doc """
45-
Logs a warning if the log level is set to warning
49+
Logs a warning if the log level is set to warning.
50+
51+
Accepts an optional `throttle: {max_count, window_ms}` option to limit
52+
how many times the log is emitted per tenant+code within the given time window.
4653
"""
47-
@spec maybe_log_warning(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any()) :: {:error, %{reason: binary}}
48-
def maybe_log_warning(socket, code, msg), do: maybe_log(socket, :warning, code, msg)
54+
@spec maybe_log_warning(socket :: Phoenix.Socket.t(), code :: binary(), msg :: any(), opts :: keyword()) ::
55+
{:error, %{reason: binary}}
56+
def maybe_log_warning(socket, code, msg, opts \\ []), do: maybe_log(socket, :warning, code, msg, opts)
4957

5058
@doc """
51-
Logs an info if the log level is set to info
59+
Logs an info if the log level is set to info.
5260
"""
5361
@spec maybe_log_info(socket :: Phoenix.Socket.t(), msg :: any()) :: :ok
54-
def maybe_log_info(socket, msg), do: maybe_log(socket, :info, nil, msg)
62+
def maybe_log_info(socket, msg), do: maybe_log(socket, :info, nil, msg, [])
5563

56-
defp build_msg(code, msg) do
57-
msg = stringify!(msg)
58-
if code, do: "#{code}: #{msg}", else: msg
59-
end
64+
defp build_msg(nil, msg), do: stringify!(msg)
65+
defp build_msg(code, msg), do: "#{code}: #{stringify!(msg)}"
6066

61-
defp log(%{assigns: %{tenant: tenant, access_token: access_token}}, level, code, msg) do
67+
defp log(%{assigns: assigns}, level, code, msg) do
68+
tenant = assigns.tenant
6269
Logger.metadata(external_id: tenant, project: tenant)
63-
if level in [:error, :warning], do: update_metadata_with_token_claims(access_token)
70+
enrich_metadata(level, Map.get(assigns, :access_token))
6471
Logger.log(level, msg, error_code: code)
65-
if level in [:error], do: emit_system_error(level, code, tenant)
72+
emit_telemetry(level, code, tenant)
6673
end
6774

68-
defp maybe_log(%{assigns: %{log_level: log_level}} = socket, level, code, msg) do
69-
msg = build_msg(code, msg)
70-
if Logger.compare_levels(log_level, level) != :gt, do: log(socket, level, code, msg)
71-
if level in [:error, :warning], do: {:error, %{reason: msg}}, else: :ok
75+
defp enrich_metadata(level, token) when level in [:error, :warning],
76+
do: update_metadata_with_token_claims(token)
77+
78+
defp enrich_metadata(_level, _token), do: :ok
79+
80+
defp emit_telemetry(:error, code, tenant),
81+
do: Telemetry.execute([:realtime, :channel, :error], %{count: 1}, %{code: code, tenant: tenant})
82+
83+
defp emit_telemetry(_level, _code, _tenant), do: :ok
84+
85+
defp maybe_log(%{assigns: %{log_level: log_level}} = socket, level, code, msg, opts) do
86+
built_msg = build_msg(code, msg)
87+
if Logger.compare_levels(log_level, level) != :gt, do: do_log(socket, level, code, built_msg, opts)
88+
if level in [:error, :warning], do: {:error, %{reason: built_msg}}, else: :ok
7289
end
7390

74-
defp emit_system_error(level, code, tenant_id),
75-
do: Telemetry.execute([:realtime, :channel, level], %{count: 1}, %{code: code, tenant: tenant_id})
91+
defp do_log(socket, level, code, msg, []), do: log(socket, level, code, msg)
92+
93+
defp do_log(%{assigns: %{tenant: tenant}} = socket, level, code, msg, throttle: {max_count, window_ms}) do
94+
key = {tenant, level, code}
95+
96+
case Cachex.get(Realtime.LogThrottle, key) do
97+
{:ok, nil} ->
98+
Cachex.put(Realtime.LogThrottle, key, 1, expire: window_ms)
99+
log(socket, level, code, msg)
100+
101+
{:ok, count} when count < max_count ->
102+
Cachex.incr(Realtime.LogThrottle, key)
103+
log(socket, level, code, msg)
104+
105+
_ ->
106+
emit_telemetry(level, code, tenant)
107+
end
108+
end
76109

77110
defp stringify!(msg) when is_binary(msg), do: msg
78111
defp stringify!(msg), do: inspect(msg, pretty: true)
79112

80-
defp update_metadata_with_token_claims(nil), do: nil
113+
defp update_metadata_with_token_claims(nil), do: :ok
81114

82115
defp update_metadata_with_token_claims(token) do
83116
case Joken.peek_claims(token) do
84-
{:ok, claims} ->
85-
sub = Map.get(claims, "sub")
86-
exp = Map.get(claims, "exp")
87-
iss = Map.get(claims, "iss")
88-
Logger.metadata(sub: sub, exp: exp, iss: iss)
89-
90-
_ ->
91-
nil
117+
{:ok, claims} -> Logger.metadata(sub: claims["sub"], exp: claims["exp"], iss: claims["iss"])
118+
_ -> :ok
92119
end
93120
end
94121
end

test/realtime_web/channels/realtime_channel/logging_test.exs

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
2-
# async: false due to changes in Logger levels
2+
# async: false due to changes in Logger levels and shared Cachex state
33
use Realtime.DataCase, async: false
44
import ExUnit.CaptureLog
55
alias RealtimeWeb.RealtimeChannel.Logging
@@ -11,6 +11,7 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
1111
level = Logger.level()
1212
Logger.configure(level: :info)
1313
tenant = tenant_fixture()
14+
Cachex.clear(Realtime.LogThrottle)
1415

1516
on_exit(fn ->
1617
:telemetry.detach(__MODULE__)
@@ -62,7 +63,7 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
6263
end
6364
end
6465

65-
describe "maybe_log_error/3" do
66+
describe "maybe_log_error/4" do
6667
test "logs error message when log_level is less or equal to error" do
6768
log_levels = [:debug, :info, :warning, :error]
6869

@@ -102,8 +103,8 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
102103
end
103104
end
104105

105-
describe "maybe_log_warning/3" do
106-
test "logs error message when log_level is less or equal to warning" do
106+
describe "maybe_log_warning/4" do
107+
test "logs warning message when log_level is less or equal to warning" do
107108
log_levels = [:debug, :info, :warning]
108109

109110
for log_level <- log_levels do
@@ -194,4 +195,87 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
194195
log = capture_log(fn -> Logging.maybe_log_error(socket, "TestError", "test error") end)
195196
assert log =~ tenant_id
196197
end
198+
199+
describe "throttle option" do
200+
test "logs exactly max_count times within the window but always emits telemetry" do
201+
tenant_id = random_string()
202+
socket = %{assigns: %{log_level: :error, tenant: tenant_id, access_token: "test_token"}}
203+
204+
logs =
205+
capture_log(fn ->
206+
for _ <- 1..5 do
207+
Logging.maybe_log_error(socket, "ThrottleCode", "msg", throttle: {3, :timer.seconds(60)})
208+
end
209+
end)
210+
211+
assert logs |> String.split("ThrottleCode: msg") |> length() == 4
212+
213+
for _ <- 1..5 do
214+
assert_receive {[:realtime, :channel, :error], %{count: 1}, %{code: "ThrottleCode", tenant: ^tenant_id}}
215+
end
216+
end
217+
218+
test "still returns {:error, reason} even when throttled" do
219+
tenant_id = random_string()
220+
socket = %{assigns: %{log_level: :error, tenant: tenant_id, access_token: "test_token"}}
221+
222+
for _ <- 1..5 do
223+
assert Logging.maybe_log_error(socket, "ThrottleCode", "msg", throttle: {2, :timer.seconds(60)}) ==
224+
{:error, %{reason: "ThrottleCode: msg"}}
225+
end
226+
end
227+
228+
test "resets after the window expires" do
229+
tenant_id = random_string()
230+
socket = %{assigns: %{log_level: :error, tenant: tenant_id, access_token: "test_token"}}
231+
232+
logs_before =
233+
capture_log(fn ->
234+
for _ <- 1..3, do: Logging.maybe_log_error(socket, "WindowCode", "msg", throttle: {2, 200})
235+
end)
236+
237+
assert logs_before |> String.split("WindowCode: msg") |> length() == 3
238+
239+
Process.sleep(400)
240+
241+
logs_after =
242+
capture_log(fn ->
243+
for _ <- 1..3, do: Logging.maybe_log_error(socket, "WindowCode", "msg", throttle: {2, 200})
244+
end)
245+
246+
assert logs_after |> String.split("WindowCode: msg") |> length() == 3
247+
end
248+
249+
test "different tenant+code pairs have independent counters" do
250+
socket_a = %{assigns: %{log_level: :error, tenant: random_string(), access_token: "t"}}
251+
socket_b = %{assigns: %{log_level: :error, tenant: random_string(), access_token: "t"}}
252+
253+
logs =
254+
capture_log(fn ->
255+
for _ <- 1..3 do
256+
Logging.maybe_log_error(socket_a, "CodeA", "msg", throttle: {2, :timer.seconds(60)})
257+
Logging.maybe_log_error(socket_b, "CodeB", "msg", throttle: {2, :timer.seconds(60)})
258+
end
259+
end)
260+
261+
assert logs |> String.split("CodeA: msg") |> length() == 3
262+
assert logs |> String.split("CodeB: msg") |> length() == 3
263+
end
264+
265+
test "concurrent callers do not exceed max_count" do
266+
tenant_id = random_string()
267+
socket = %{assigns: %{log_level: :error, tenant: tenant_id, access_token: "test_token"}}
268+
269+
logs =
270+
capture_log(fn ->
271+
1..20
272+
|> Task.async_stream(fn _ ->
273+
Logging.maybe_log_error(socket, "ConcurrentCode", "msg", throttle: {5, :timer.seconds(60)})
274+
end)
275+
|> Stream.run()
276+
end)
277+
278+
assert logs |> String.split("ConcurrentCode: msg") |> length() <= 6
279+
end
280+
end
197281
end

0 commit comments

Comments
 (0)