Skip to content

Commit 87d663d

Browse files
authored
fix: tick most RateCounters every 5 seconds instead of every second (#1787)
* fix: ensure RateCounter avg is always per second * fix: change tenant rate counters to tick every 5 seconds
1 parent fcd4ad2 commit 87d663d

5 files changed

Lines changed: 85 additions & 8 deletions

File tree

lib/realtime/rate_counter/rate_counter.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ defmodule Realtime.RateCounter do
44
55
These rate counters use the GenCounter module.
66
Start your RateCounter here and increment it with a `GenCounter.add/1` call, for example.
7+
8+
Average is calculated as the average number of events per second
79
"""
810

911
use GenServer
@@ -208,7 +210,8 @@ defmodule Realtime.RateCounter do
208210
bucket_len = Enum.count(bucket)
209211

210212
sum = Enum.sum(bucket)
211-
avg = sum / bucket_len
213+
214+
avg = sum / bucket_len / (state.tick / 1_000)
212215

213216
state = %{state | bucket: bucket, sum: sum, avg: avg}
214217

lib/realtime/tenants.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ defmodule Realtime.Tenants do
151151
@spec joins_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
152152
def joins_per_second_rate(tenant_id, max_joins_per_second) when is_binary(tenant_id) do
153153
opts = [
154+
tick: :timer.seconds(5),
155+
max_bucket_len: 12,
154156
telemetry: %{
155157
event_name: [:channel, :joins],
156158
measurements: %{limit: max_joins_per_second},
@@ -197,6 +199,8 @@ defmodule Realtime.Tenants do
197199

198200
def events_per_second_rate(tenant_id, max_events_per_second) do
199201
opts = [
202+
tick: :timer.seconds(5),
203+
max_bucket_len: 12,
200204
telemetry: %{
201205
event_name: [:channel, :events],
202206
measurements: %{limit: max_events_per_second},
@@ -244,6 +248,8 @@ defmodule Realtime.Tenants do
244248
@spec db_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
245249
def db_events_per_second_rate(tenant_id, max_events_per_second) when is_binary(tenant_id) do
246250
opts = [
251+
tick: :timer.seconds(5),
252+
max_bucket_len: 12,
247253
telemetry: %{
248254
event_name: [:channel, :db_events],
249255
measurements: %{},
@@ -290,6 +296,8 @@ defmodule Realtime.Tenants do
290296
@spec presence_events_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
291297
def presence_events_per_second_rate(tenant_id, max_presence_events_per_second) do
292298
opts = [
299+
tick: :timer.seconds(5),
300+
max_bucket_len: 12,
293301
telemetry: %{
294302
event_name: [:channel, :presence_events],
295303
measurements: %{limit: max_presence_events_per_second},

test/integration/rt_channel/connection_lifecycle_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ defmodule Realtime.Integration.RtChannel.ConnectionLifecycleTest do
305305

306306
log =
307307
capture_log(fn ->
308-
for _ <- 1..300 do
308+
for _ <- 1..1500 do
309309
WebsocketClient.join(socket, realtime_topic, %{config: config})
310310
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^realtime_topic}, 500
311311
end

test/realtime/rate_counter/rate_counter_test.exs

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ defmodule Realtime.RateCounterTest do
174174

175175
log =
176176
capture_log(fn ->
177-
GenCounter.add(args.id, 50)
177+
GenCounter.add(args.id, 6)
178178
Process.sleep(300)
179179
end)
180180

@@ -185,7 +185,7 @@ defmodule Realtime.RateCounterTest do
185185
# Splitting by the error message returns the error message and the rest of the log only
186186
assert length(String.split(log, "ErrorMessage: Reason")) == 2
187187

188-
Process.sleep(300)
188+
Process.sleep(400)
189189

190190
assert {:ok, %RateCounter{limit: %{triggered: false}}} = RateCounter.get(args)
191191
end
@@ -301,6 +301,72 @@ defmodule Realtime.RateCounterTest do
301301
end
302302
end
303303

304+
describe "avg normalization" do
305+
test "avg represents events per second regardless of tick interval" do
306+
# 1-second tick: add 10 events → avg should be ~10 events/second
307+
id_1s = {:domain, :metric, Ecto.UUID.generate()}
308+
args_1s = %Args{id: id_1s, opts: [tick: 1_000, max_bucket_len: 1]}
309+
{:ok, pid} = RateCounter.new(args_1s)
310+
# wait for init to complete
311+
:sys.get_state(pid)
312+
313+
GenCounter.add(id_1s, 10)
314+
{:ok, state_1s} = RateCounterHelper.tick!(args_1s)
315+
assert_in_delta state_1s.avg, 10.0, 0.01
316+
317+
# 5-second tick: add 50 events (= 10 per second) → avg should also be ~10 events/second
318+
id_5s = {:domain, :metric, Ecto.UUID.generate()}
319+
args_5s = %Args{id: id_5s, opts: [tick: 5_000, max_bucket_len: 1]}
320+
{:ok, pid} = RateCounter.new(args_5s)
321+
# wait for init to complete
322+
:sys.get_state(pid)
323+
324+
GenCounter.add(id_5s, 50)
325+
{:ok, state_5s} = RateCounterHelper.tick!(args_5s)
326+
assert_in_delta state_5s.avg, 10.0, 0.01
327+
end
328+
329+
test "avg limit triggers and unsets correctly with a non-1-second tick" do
330+
id = {:domain, :metric, Ecto.UUID.generate()}
331+
332+
args = %Args{
333+
id: id,
334+
opts: [
335+
tick: 5_000,
336+
max_bucket_len: 1,
337+
limit: [
338+
value: 10,
339+
measurement: :avg,
340+
log_fn: fn ->
341+
Logger.warning("RateLimitReached", external_id: "tenant123", project: "tenant123")
342+
end
343+
]
344+
]
345+
}
346+
347+
{:ok, pid} = RateCounter.new(args)
348+
# wait for init to complete
349+
:sys.get_state(pid)
350+
351+
# 60 events over a 5-second tick = 12 events/second, above the 10/s limit
352+
log =
353+
capture_log(fn ->
354+
GenCounter.add(id, 60)
355+
RateCounterHelper.tick!(args)
356+
end)
357+
358+
assert {:ok, %RateCounter{avg: avg, limit: %{triggered: true}}} = RateCounter.get(args)
359+
assert_in_delta avg, 12.0, 0.01
360+
assert log =~ "RateLimitReached"
361+
362+
# 40 events over a 5-second tick = 8 events/second, below the 10/s limit
363+
GenCounter.add(id, 40)
364+
RateCounterHelper.tick!(args)
365+
assert {:ok, %RateCounter{avg: avg, limit: %{triggered: false}}} = RateCounter.get(args)
366+
assert_in_delta avg, 8.0, 0.01
367+
end
368+
end
369+
304370
describe "publish_update/1" do
305371
test "cause shutdown with update message from update topic" do
306372
args = %Args{id: {:domain, :metric, Ecto.UUID.generate()}}

test/realtime_web/channels/realtime_channel/presence_handler_test.exs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
450450

451451
log =
452452
capture_log(fn ->
453-
for _ <- 1..300, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
453+
for _ <- 1..1500, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
454454

455455
{:ok, _} = RateCounterHelper.tick!(Tenants.presence_events_per_second_rate(tenant))
456456

@@ -466,7 +466,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
466466

467467
log =
468468
capture_log(fn ->
469-
for _ <- 1..300, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
469+
for _ <- 1..1500, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
470470

471471
{:ok, _} = RateCounterHelper.tick!(Tenants.presence_events_per_second_rate(tenant))
472472

@@ -535,7 +535,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
535535

536536
log =
537537
capture_log(fn ->
538-
for _ <- 1..300, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
538+
for _ <- 1..1500, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
539539

540540
{:ok, _} = RateCounterHelper.tick!(Tenants.presence_events_per_second_rate(tenant))
541541

@@ -552,7 +552,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
552552

553553
log =
554554
capture_log(fn ->
555-
for _ <- 1..300, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
555+
for _ <- 1..1500, do: PresenceHandler.handle(%{"event" => "track"}, db_conn, socket)
556556

557557
{:ok, _} = RateCounterHelper.tick!(Tenants.presence_events_per_second_rate(tenant))
558558

0 commit comments

Comments
 (0)