Skip to content

Commit 69ee866

Browse files
committed
Rate limit: Implement queueing mechanism
Token buckets form the basis of the rate limiting, but on their own they lack a way of enabling fairness when waiting. For this, we add a separate queuing mechanism which registers callbacks for requests that are waiting on a token bucket to refill. A worker thread is tasked with awakening requests as enough tokens are placed back in the bucket. We expose an interface for submitting requests to a queue, both in synchronous and asynchronous mode. Signed-off-by: Christian Pardillo Laursen <christian.pardillolaursen@citrix.com>
1 parent 25d74d2 commit 69ee866

2 files changed

Lines changed: 178 additions & 0 deletions

File tree

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
(*
2+
* Copyright (C) 2025 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
module D = Debug.Make (struct let name = "rate_limit" end)
16+
17+
type t = {
18+
bucket: Token_bucket.t
19+
; process_queue:
20+
(float * (unit -> unit)) Queue.t (* contains token cost and callback *)
21+
; process_queue_lock: Mutex.t
22+
; worker_thread_cond: Condition.t
23+
; should_terminate: bool ref (* signal termination to worker thread *)
24+
; worker_thread: Thread.t
25+
}
26+
27+
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute
28+
29+
(* The worker thread is responsible for calling the callback when the token
30+
amount becomes available *)
31+
let rec worker_loop ~bucket ~process_queue ~process_queue_lock
32+
~worker_thread_cond ~should_terminate =
33+
let process_item cost callback =
34+
Token_bucket.delay_then_consume bucket cost ;
35+
callback ()
36+
in
37+
let item_opt =
38+
with_lock process_queue_lock (fun () ->
39+
while Queue.is_empty process_queue && not !should_terminate do
40+
Condition.wait worker_thread_cond process_queue_lock
41+
done ;
42+
Queue.take_opt process_queue
43+
)
44+
in
45+
match item_opt with
46+
| None ->
47+
(* Queue is empty only when termination was signalled *)
48+
()
49+
| Some (cost, callback) ->
50+
process_item cost callback ;
51+
worker_loop ~bucket ~process_queue ~process_queue_lock ~worker_thread_cond
52+
~should_terminate
53+
54+
let create ~burst_size ~fill_rate =
55+
match Token_bucket.create ~burst_size ~fill_rate with
56+
| Some bucket ->
57+
let process_queue = Queue.create () in
58+
let process_queue_lock = Mutex.create () in
59+
let worker_thread_cond = Condition.create () in
60+
let should_terminate = ref false in
61+
let worker_thread =
62+
Thread.create
63+
(fun () ->
64+
worker_loop ~bucket ~process_queue ~process_queue_lock
65+
~worker_thread_cond ~should_terminate
66+
)
67+
()
68+
in
69+
{
70+
bucket
71+
; process_queue
72+
; process_queue_lock
73+
; worker_thread_cond
74+
; should_terminate
75+
; worker_thread
76+
}
77+
| None ->
78+
raise (Failure "Invalid token bucket parameters")
79+
80+
let delete data =
81+
with_lock data.process_queue_lock (fun () ->
82+
data.should_terminate := true ;
83+
Condition.signal data.worker_thread_cond
84+
) ;
85+
Thread.join data.worker_thread
86+
87+
(* The callback should return quickly - if it is a longer task it is
88+
responsible for creating a thread to do the task *)
89+
let submit_async
90+
({bucket; process_queue; process_queue_lock; worker_thread_cond; _} as _data)
91+
~callback amount =
92+
let run_immediately =
93+
with_lock process_queue_lock (fun () ->
94+
let immediate =
95+
Queue.is_empty process_queue && Token_bucket.consume bucket amount
96+
in
97+
if not immediate then (
98+
Queue.add (amount, callback) process_queue ;
99+
Condition.signal worker_thread_cond
100+
) ;
101+
immediate
102+
)
103+
in
104+
if run_immediately then
105+
callback ()
106+
else
107+
D.debug "rate limiting sync call"
108+
109+
(* Block and execute on the same thread *)
110+
let submit_sync bucket_data ~callback amount =
111+
let channel_opt =
112+
with_lock bucket_data.process_queue_lock (fun () ->
113+
if
114+
Queue.is_empty bucket_data.process_queue
115+
&& Token_bucket.consume bucket_data.bucket amount
116+
then
117+
None
118+
(* Can run callback immediately after releasing lock *)
119+
else
120+
(* Rate limited, need to retrieve function result via channel *)
121+
let channel = Event.new_channel () in
122+
Queue.add
123+
(amount, fun () -> Event.sync (Event.send channel ()))
124+
bucket_data.process_queue ;
125+
Condition.signal bucket_data.worker_thread_cond ;
126+
Some channel
127+
)
128+
in
129+
match channel_opt with
130+
| None ->
131+
callback ()
132+
| Some channel ->
133+
Event.sync (Event.receive channel) ;
134+
callback ()
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
(*
2+
* Copyright (C) 2025 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
(** Rate limiter built on top of a token bucket. Provides async and sync
16+
submission of callbacks that are rate-limited. Uses a worker thread
17+
to process queued callbacks when tokens become available. *)
18+
19+
type t
20+
21+
val create : burst_size:float -> fill_rate:float -> t
22+
(** [create ~burst_size ~fill_rate] creates a new rate limiter with the given
23+
token bucket parameters. Raises [Failure] if the parameters are invalid
24+
(e.g. non-positive fill rate).
25+
@param burst_size Maximum number of tokens in the bucket
26+
@param fill_rate Number of tokens added per second *)
27+
28+
val delete : t -> unit
29+
(** [delete t] signals the worker thread to terminate and waits for it to
30+
finish processing any remaining queued callbacks. *)
31+
32+
val submit_async : t -> callback:(unit -> unit) -> float -> unit
33+
(** [submit_async t ~callback amount] submits a callback under rate limiting.
34+
If tokens are immediately available and no callbacks are queued, the
35+
callback runs synchronously on the calling thread. Otherwise it is
36+
enqueued and will be executed by a worker thread when tokens become
37+
available. Returns immediately. *)
38+
39+
val submit_sync : t -> callback:(unit -> 'a) -> float -> 'a
40+
(** [submit_sync t ~callback amount] submits a callback under rate limiting
41+
and blocks until it completes, returning the callback's result. If tokens
42+
are immediately available and no callbacks are queued, the callback runs
43+
directly. Otherwise, the caller blocks until the worker thread signals
44+
that tokens are available. *)

0 commit comments

Comments
 (0)