Skip to content

Commit 7e072bc

Browse files
committed
Rate limit: Implement queueing mechanism
Token buckets form the basis of the rate limiting, but on their own they lack a way of enabling fairness when waiting. For this, we add a separate queuing mechanism which registers callbacks for requests that are waiting on a token bucket to refill. A worker thread is tasked with awakening requests as enough tokens are placed back in the bucket. We expose an interface for submitting requests to a queue, both in synchronous and asynchronous mode. Signed-off-by: Christian Pardillo Laursen <christian.pardillolaursen@citrix.com>
1 parent 9d76c1b commit 7e072bc

2 files changed

Lines changed: 193 additions & 0 deletions

File tree

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
(*
2+
* Copyright (C) 2025 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
module D = Debug.Make (struct let name = __MODULE__ end)
16+
17+
type t = {
18+
bucket: Token_bucket.t
19+
; process_queue:
20+
(float * (unit -> unit)) Queue.t (* contains token cost and callback *)
21+
; process_queue_lock: Mutex.t
22+
; worker_thread_cond: Condition.t
23+
; should_terminate: bool Atomic.t
24+
(* Signal termination to worker thread. The worker thread will
25+
process all remaining items in the queue before exiting. *)
26+
; worker_thread: Thread.t
27+
}
28+
29+
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute
30+
31+
(* The worker thread is responsible for calling the callback when the token
32+
amount becomes available *)
33+
let rec worker_loop ~bucket ~process_queue ~process_queue_lock
34+
~worker_thread_cond ~should_terminate =
35+
let process_item cost callback =
36+
Token_bucket.delay_then_consume bucket cost ;
37+
callback ()
38+
in
39+
let item_opt =
40+
with_lock process_queue_lock (fun () ->
41+
while
42+
Queue.is_empty process_queue && not (Atomic.get should_terminate)
43+
do
44+
Condition.wait worker_thread_cond process_queue_lock
45+
done ;
46+
Queue.take_opt process_queue
47+
)
48+
in
49+
match item_opt with
50+
| None ->
51+
(* Queue is empty only when termination was signalled *)
52+
D.debug "%s: queue empty in deleted rate limiter; exiting" __FUNCTION__
53+
| Some (cost, callback) ->
54+
process_item cost callback ;
55+
worker_loop ~bucket ~process_queue ~process_queue_lock ~worker_thread_cond
56+
~should_terminate
57+
58+
let create ~burst_size ~fill_rate =
59+
let bucket = Token_bucket.create ~burst_size ~fill_rate in
60+
let process_queue = Queue.create () in
61+
let process_queue_lock = Mutex.create () in
62+
let worker_thread_cond = Condition.create () in
63+
let should_terminate = Atomic.make false in
64+
let worker_thread =
65+
Thread.create
66+
(fun () ->
67+
worker_loop ~bucket ~process_queue ~process_queue_lock
68+
~worker_thread_cond ~should_terminate
69+
)
70+
()
71+
in
72+
{
73+
bucket
74+
; process_queue
75+
; process_queue_lock
76+
; worker_thread_cond
77+
; should_terminate
78+
; worker_thread
79+
}
80+
81+
let delete data =
82+
if Atomic.compare_and_set data.should_terminate false true then
83+
Condition.signal data.worker_thread_cond ;
84+
Thread.join data.worker_thread
85+
86+
let check_not_terminated should_terminate =
87+
if Atomic.get should_terminate then
88+
invalid_arg "Rate_limit: submit called on a deleted rate limiter"
89+
90+
(* The callback should return quickly - if it is a longer task it is
91+
responsible for creating a thread to do the task *)
92+
let submit_async
93+
{
94+
bucket
95+
; process_queue
96+
; process_queue_lock
97+
; worker_thread_cond
98+
; should_terminate
99+
; _
100+
} ~callback amount =
101+
check_not_terminated should_terminate ;
102+
let run_immediately =
103+
with_lock process_queue_lock (fun () ->
104+
let immediate =
105+
Queue.is_empty process_queue && Token_bucket.consume bucket amount
106+
in
107+
if not immediate then (
108+
Queue.add (amount, callback) process_queue ;
109+
Condition.signal worker_thread_cond
110+
) ;
111+
immediate
112+
)
113+
in
114+
if run_immediately then
115+
callback ()
116+
else
117+
D.debug "%s: rate limiting call" __FUNCTION__
118+
119+
(* Block and execute on the same thread *)
120+
let submit_sync bucket_data ~callback amount =
121+
check_not_terminated bucket_data.should_terminate ;
122+
let channel_opt =
123+
with_lock bucket_data.process_queue_lock (fun () ->
124+
if
125+
Queue.is_empty bucket_data.process_queue
126+
&& Token_bucket.consume bucket_data.bucket amount
127+
then
128+
None
129+
(* Can run callback immediately after releasing lock *)
130+
else
131+
(* Rate limited, need to retrieve function result via channel *)
132+
let channel = Event.new_channel () in
133+
Queue.add
134+
(amount, fun () -> Event.sync (Event.send channel ()))
135+
bucket_data.process_queue ;
136+
Condition.signal bucket_data.worker_thread_cond ;
137+
Some channel
138+
)
139+
in
140+
match channel_opt with
141+
| None ->
142+
callback ()
143+
| Some channel ->
144+
D.debug "%s: rate limiting call" __FUNCTION__ ;
145+
Event.sync (Event.receive channel) ;
146+
callback ()
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
(*
2+
* Copyright (C) 2025 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
(** Rate limiter built on top of a token bucket. Provides async and sync
16+
submission of callbacks that are rate-limited. Uses a worker thread
17+
to process queued callbacks when tokens become available. *)
18+
19+
type t
20+
21+
val create : burst_size:float -> fill_rate:float -> t
22+
(** [create ~burst_size ~fill_rate] creates a new rate limiter with the given
23+
token bucket parameters.
24+
@raises Invalid_argument if the parameters are invalid
25+
(e.g. non-positive fill rate).
26+
@param burst_size Maximum number of tokens in the bucket
27+
@param fill_rate Number of tokens added per second *)
28+
29+
val delete : t -> unit
30+
(** [delete t] signals the worker thread to terminate. The worker thread
31+
processes any remaining queued callbacks, then exits. Blocks the caller
32+
until the worker thread has finished. Subsequent calls to [submit_async]
33+
or [submit_sync] will raise [Invalid_argument]. *)
34+
35+
val submit_async : t -> callback:(unit -> unit) -> float -> unit
36+
(** [submit_async t ~callback amount] submits a callback under rate limiting.
37+
If tokens are immediately available and no callbacks are queued, the
38+
callback runs synchronously on the calling thread. Otherwise it is
39+
enqueued and will be executed by a worker thread when tokens become
40+
available. Returns immediately. *)
41+
42+
val submit_sync : t -> callback:(unit -> 'a) -> float -> 'a
43+
(** [submit_sync t ~callback amount] submits a callback under rate limiting
44+
and blocks until it completes, returning the callback's result. If tokens
45+
are immediately available and no callbacks are queued, the callback runs
46+
directly. Otherwise, the caller blocks until the worker thread signals
47+
that tokens are available. *)

0 commit comments

Comments
 (0)