Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@
(name tgroup)
(depends xapi-log xapi-stdext-unix))

(package
(name xapi-rate-limit)
(synopsis "A simple token bucket-based rate limter for XAPI")
(depends
(ocaml (>= 4.12))
xapi-log xapi-stdext-unix))

(package
(name xml-light2))

Expand Down
8 changes: 8 additions & 0 deletions ocaml/libs/rate-limit/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(library
(name xapi_rate_limit)
(public_name xapi-rate-limit)

(libraries threads.posix mtime mtime.clock.os xapi-log xapi-stdext-threads clock)
)


146 changes: 146 additions & 0 deletions ocaml/libs/rate-limit/rate_limit.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
(*
* Copyright (C) 2025 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

module D = Debug.Make (struct let name = __MODULE__ end)

type t = {
bucket: Token_bucket.t
; process_queue:
(float * (unit -> unit)) Queue.t (* contains token cost and callback *)
; process_queue_lock: Mutex.t
; worker_thread_cond: Condition.t
; should_terminate: bool Atomic.t
(* Signal termination to worker thread. The worker thread will
process all remaining items in the queue before exiting. *)
; worker_thread: Thread.t
}

let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

(* The worker thread is responsible for calling the callback when the token
amount becomes available *)
let rec worker_loop ~bucket ~process_queue ~process_queue_lock
~worker_thread_cond ~should_terminate =
let process_item cost callback =
Token_bucket.delay_then_consume bucket cost ;
callback ()
in
let item_opt =
with_lock process_queue_lock (fun () ->
while
Queue.is_empty process_queue && not (Atomic.get should_terminate)
do
Condition.wait worker_thread_cond process_queue_lock
done ;
Queue.take_opt process_queue
)
in
match item_opt with
| None ->
(* Queue is empty only when termination was signalled *)
D.debug "%s: queue empty in deleted rate limiter; exiting" __FUNCTION__
| Some (cost, callback) ->
process_item cost callback ;
worker_loop ~bucket ~process_queue ~process_queue_lock ~worker_thread_cond
~should_terminate

let create ~burst_size ~fill_rate =
let bucket = Token_bucket.create ~burst_size ~fill_rate in
let process_queue = Queue.create () in
let process_queue_lock = Mutex.create () in
let worker_thread_cond = Condition.create () in
let should_terminate = Atomic.make false in
let worker_thread =
Thread.create
(fun () ->
worker_loop ~bucket ~process_queue ~process_queue_lock
~worker_thread_cond ~should_terminate
)
()
in
{
bucket
; process_queue
; process_queue_lock
; worker_thread_cond
; should_terminate
; worker_thread
}

let delete data =
if Atomic.compare_and_set data.should_terminate false true then
Condition.signal data.worker_thread_cond ;
Thread.join data.worker_thread

let check_not_terminated should_terminate =
if Atomic.get should_terminate then
invalid_arg "Rate_limit: submit called on a deleted rate limiter"

(* The callback should return quickly - if it is a longer task it is
responsible for creating a thread to do the task *)
let submit_async
{
bucket
; process_queue
; process_queue_lock
; worker_thread_cond
; should_terminate
; _
} ~callback amount =
check_not_terminated should_terminate ;
let run_immediately =
with_lock process_queue_lock (fun () ->
let immediate =
Queue.is_empty process_queue && Token_bucket.consume bucket amount
in
if not immediate then (
Queue.add (amount, callback) process_queue ;
Condition.signal worker_thread_cond
) ;
immediate
)
in
if run_immediately then
callback ()
else
D.debug "%s: rate limiting call" __FUNCTION__

(* Block and execute on the same thread *)
let submit_sync bucket_data ~callback amount =
check_not_terminated bucket_data.should_terminate ;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an easy way to measure the duration an execution took and to log it? This way we could spot executions that take too long.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a Xapi_stats module that could be used. Longer term the opentelemetry metrics feature could be used to expose more counters.
Meanwhile we already have opentelemetry spans that contain high-resolution begin/end timestamps, so when distributed tracing is enabled, this information can be extracted from there.

I think it'd be useful to create an opentelemetry span whenever rate-limiting kicks in, to make this more visible though. Perhaps @GabrielBuica could help with picking a good place to add distributed tracing spans in the rate limiting code.

let channel_opt =
with_lock bucket_data.process_queue_lock (fun () ->
if
Queue.is_empty bucket_data.process_queue
&& Token_bucket.consume bucket_data.bucket amount
then
None
(* Can run callback immediately after releasing lock *)
else
(* Rate limited, need to retrieve function result via channel *)
let channel = Event.new_channel () in
Queue.add
(amount, fun () -> Event.sync (Event.send channel ()))
bucket_data.process_queue ;
Condition.signal bucket_data.worker_thread_cond ;
Some channel
)
in
match channel_opt with
| None ->
callback ()
| Some channel ->
D.debug "%s: rate limiting call" __FUNCTION__ ;
Event.sync (Event.receive channel) ;
callback ()
47 changes: 47 additions & 0 deletions ocaml/libs/rate-limit/rate_limit.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
(*
* Copyright (C) 2025 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

(** Rate limiter built on top of a token bucket. Provides async and sync
submission of callbacks that are rate-limited. Uses a worker thread
to process queued callbacks when tokens become available. *)

type t

val create : burst_size:float -> fill_rate:float -> t
(** [create ~burst_size ~fill_rate] creates a new rate limiter with the given
token bucket parameters.
@raises Invalid_argument if the parameters are invalid
(e.g. non-positive fill rate).
@param burst_size Maximum number of tokens in the bucket
@param fill_rate Number of tokens added per second *)

val delete : t -> unit
(** [delete t] signals the worker thread to terminate. The worker thread
processes any remaining queued callbacks, then exits. Blocks the caller
until the worker thread has finished. Subsequent calls to [submit_async]
or [submit_sync] will raise [Invalid_argument]. *)

val submit_async : t -> callback:(unit -> unit) -> float -> unit
(** [submit_async t ~callback amount] submits a callback under rate limiting.
If tokens are immediately available and no callbacks are queued, the
callback runs synchronously on the calling thread. Otherwise it is
enqueued and will be executed by a worker thread when tokens become
available. Returns immediately. *)

val submit_sync : t -> callback:(unit -> 'a) -> float -> 'a
(** [submit_sync t ~callback amount] submits a callback under rate limiting
and blocks until it completes, returning the callback's result. If tokens
are immediately available and no callbacks are queued, the callback runs
directly. Otherwise, the caller blocks until the worker thread signals
that tokens are available. *)
83 changes: 83 additions & 0 deletions ocaml/libs/rate-limit/token_bucket.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
(*
* Copyright (C) 2025 Cloud Software Group
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)

type state = {tokens: float; last_refill: Mtime.span}

type t = {burst_size: float; fill_rate: float; state: state Atomic.t}

let create_with_timestamp timestamp ~burst_size ~fill_rate =
if fill_rate <= 0. then
invalid_arg "Token_bucket.create: fill_rate must be positive"
else
let state = Atomic.make {tokens= burst_size; last_refill= timestamp} in
{burst_size; fill_rate; state}

let create = create_with_timestamp (Mtime_clock.elapsed ())

let compute_tokens timestamp {tokens; last_refill} ~burst_size ~fill_rate =
let time_delta = Mtime.Span.abs_diff last_refill timestamp in
let time_delta_seconds = Mtime.Span.to_float_ns time_delta *. 1e-9 in
min burst_size (tokens +. (time_delta_seconds *. fill_rate))

let peek_with_timestamp timestamp tb =
let tb_state = Atomic.get tb.state in
compute_tokens timestamp tb_state ~burst_size:tb.burst_size
~fill_rate:tb.fill_rate

let peek tb = peek_with_timestamp (Mtime_clock.elapsed ()) tb

let consume_with_timestamp get_time tb amount =
let rec try_consume () =
let timestamp = get_time () in
let old_state = Atomic.get tb.state in
let new_tokens =
compute_tokens timestamp old_state ~burst_size:tb.burst_size
~fill_rate:tb.fill_rate
in
let success, final_tokens =
if new_tokens >= amount then
(true, new_tokens -. amount)
else
(false, new_tokens)
in
let new_state = {tokens= final_tokens; last_refill= timestamp} in
if Atomic.compare_and_set tb.state old_state new_state then
success
else
try_consume ()
in
try_consume ()

let consume = consume_with_timestamp Mtime_clock.elapsed

let get_delay_until_available_timestamp timestamp tb amount =
let {tokens; last_refill} = Atomic.get tb.state in
let current_tokens =
compute_tokens timestamp {tokens; last_refill} ~burst_size:tb.burst_size
~fill_rate:tb.fill_rate
in
let required_tokens = max 0. (amount -. current_tokens) in
required_tokens /. tb.fill_rate

let get_delay_until_available tb amount =
get_delay_until_available_timestamp (Mtime_clock.elapsed ()) tb amount

(* This implementation only works when there is only one thread trying to
consume - fairness needs to be implemented on top of it with a queue.
If there is no contention, it should only delay once. *)
let rec delay_then_consume tb amount =
if not (consume tb amount) then (
Thread.delay (get_delay_until_available tb amount) ;
delay_then_consume tb amount
)
Loading
Loading