From 9d76c1b23eb32b418072f6d9b6b64e2b84233e1a Mon Sep 17 00:00:00 2001 From: Christian Pardillo Laursen Date: Thu, 2 Apr 2026 10:43:41 +0100 Subject: [PATCH 1/2] Rate limit: Implement token buckets Token buckets form the core of the XAPI rate limiter. Token buckets hold tokens, which are refilled over time according to their refill rate up to a maximum. Each client making calls to XAPI will have one token bucket associated; each call takes tokens out of the bucket, and delays will be enforced such that any requests made to an empty bucket have to wait until the bucket has been refilled. The token bucket library implements functions for token bucket creation and thread-safe token consumption. Refills are calculated only when the bucket is observed by tracking a last_refill timestamp. Signed-off-by: Christian Pardillo Laursen --- dune-project | 7 ++ ocaml/libs/rate-limit/dune | 8 ++ ocaml/libs/rate-limit/token_bucket.ml | 83 ++++++++++++++++++ ocaml/libs/rate-limit/token_bucket.mli | 112 +++++++++++++++++++++++++ opam/xapi-rate-limit.opam | 31 +++++++ 5 files changed, 241 insertions(+) create mode 100644 ocaml/libs/rate-limit/dune create mode 100644 ocaml/libs/rate-limit/token_bucket.ml create mode 100644 ocaml/libs/rate-limit/token_bucket.mli create mode 100644 opam/xapi-rate-limit.opam diff --git a/dune-project b/dune-project index 94f67c4c19..43ebedd127 100644 --- a/dune-project +++ b/dune-project @@ -52,6 +52,13 @@ (name tgroup) (depends xapi-log xapi-stdext-unix)) +(package + (name xapi-rate-limit) + (synopsis "A simple token bucket-based rate limter for XAPI") + (depends + (ocaml (>= 4.12)) + xapi-log xapi-stdext-unix)) + (package (name xml-light2)) diff --git a/ocaml/libs/rate-limit/dune b/ocaml/libs/rate-limit/dune new file mode 100644 index 0000000000..418c055d8d --- /dev/null +++ b/ocaml/libs/rate-limit/dune @@ -0,0 +1,8 @@ +(library + (name xapi_rate_limit) + (public_name xapi-rate-limit) + + (libraries threads.posix mtime mtime.clock.os xapi-log xapi-stdext-threads clock) +) + + diff --git a/ocaml/libs/rate-limit/token_bucket.ml b/ocaml/libs/rate-limit/token_bucket.ml new file mode 100644 index 0000000000..25a02c0963 --- /dev/null +++ b/ocaml/libs/rate-limit/token_bucket.ml @@ -0,0 +1,83 @@ +(* + * Copyright (C) 2025 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +type state = {tokens: float; last_refill: Mtime.span} + +type t = {burst_size: float; fill_rate: float; state: state Atomic.t} + +let create_with_timestamp timestamp ~burst_size ~fill_rate = + if fill_rate <= 0. then + invalid_arg "Token_bucket.create: fill_rate must be positive" + else + let state = Atomic.make {tokens= burst_size; last_refill= timestamp} in + {burst_size; fill_rate; state} + +let create = create_with_timestamp (Mtime_clock.elapsed ()) + +let compute_tokens timestamp {tokens; last_refill} ~burst_size ~fill_rate = + let time_delta = Mtime.Span.abs_diff last_refill timestamp in + let time_delta_seconds = Mtime.Span.to_float_ns time_delta *. 1e-9 in + min burst_size (tokens +. (time_delta_seconds *. fill_rate)) + +let peek_with_timestamp timestamp tb = + let tb_state = Atomic.get tb.state in + compute_tokens timestamp tb_state ~burst_size:tb.burst_size + ~fill_rate:tb.fill_rate + +let peek tb = peek_with_timestamp (Mtime_clock.elapsed ()) tb + +let consume_with_timestamp get_time tb amount = + let rec try_consume () = + let timestamp = get_time () in + let old_state = Atomic.get tb.state in + let new_tokens = + compute_tokens timestamp old_state ~burst_size:tb.burst_size + ~fill_rate:tb.fill_rate + in + let success, final_tokens = + if new_tokens >= amount then + (true, new_tokens -. amount) + else + (false, new_tokens) + in + let new_state = {tokens= final_tokens; last_refill= timestamp} in + if Atomic.compare_and_set tb.state old_state new_state then + success + else + try_consume () + in + try_consume () + +let consume = consume_with_timestamp Mtime_clock.elapsed + +let get_delay_until_available_timestamp timestamp tb amount = + let {tokens; last_refill} = Atomic.get tb.state in + let current_tokens = + compute_tokens timestamp {tokens; last_refill} ~burst_size:tb.burst_size + ~fill_rate:tb.fill_rate + in + let required_tokens = max 0. (amount -. current_tokens) in + required_tokens /. tb.fill_rate + +let get_delay_until_available tb amount = + get_delay_until_available_timestamp (Mtime_clock.elapsed ()) tb amount + +(* This implementation only works when there is only one thread trying to + consume - fairness needs to be implemented on top of it with a queue. + If there is no contention, it should only delay once. *) +let rec delay_then_consume tb amount = + if not (consume tb amount) then ( + Thread.delay (get_delay_until_available tb amount) ; + delay_then_consume tb amount + ) diff --git a/ocaml/libs/rate-limit/token_bucket.mli b/ocaml/libs/rate-limit/token_bucket.mli new file mode 100644 index 0000000000..84f7603737 --- /dev/null +++ b/ocaml/libs/rate-limit/token_bucket.mli @@ -0,0 +1,112 @@ +(* + * Copyright (C) 2025 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** This module implements a classic token-bucket rate limiter. Token buckets + contain tokens that are refilled over time, and can be consumed in a + thread-safe way. A token bucket accumulates [fill_rate] tokens per second, + up to [burst_size]. Consumers may take tokens (if available), or query when + enough tokens will become available. + + Token counts are represented as floats rather than integers to allow + fractional token costs and fine-grained refill rates. + + Token buckets implement rate limiting by allowing operations to proceed + only when sufficient tokens are available - otherwise, the operations can + be delayed until enough tokens are available. + + To avoid doing unnecessary work to refill the bucket, token amounts are + only updated when a consume operation is carried out. The buckets keep a + last_refill timestamp which is updated on consume in tandem with the token + counts, and informs how many tokens should be added by the bucket refill. + + We include versions of functions that take a timestamp as a parameter for + testing purposes only - consumers of this library should use the + timestamp-less versions. +*) + +type t + +val create : burst_size:float -> fill_rate:float -> t +(** Create token bucket with given parameters. + @raises Invalid_argument if the fill rate is 0 or negative. + @param burst_size Maximum number of tokens that can fit in the bucket + @param fill_rate Number of tokens added to the bucket per second + *) + +val peek : t -> float +(** Retrieve current token amount + @param tb Token bucket + @return Amount of tokens in the token bucket + *) + +val consume : t -> float -> bool +(** Consume tokens from the bucket in a thread-safe manner. + @param tb Token bucket + @param amount How many tokens to consume + @return Whether the tokens were successfully consumed + *) + +val get_delay_until_available : t -> float -> float +(** Get number of seconds that need to pass until bucket is expected to have + enough tokens to fulfil the request + @param tb Token bucket + @param amount How many tokens we want to consume + @return Number of seconds until tokens are available +*) + +val delay_then_consume : t -> float -> unit +(** [delay_then_consume tb amount] sleeps the calling thread until [amount] + tokens are available, then consumes them. Thread-safe but does not + guarantee fairness between competing callers. *) + +(**/**) + +(* Fuctions accepting a timestamp are meant for testing only *) + +val create_with_timestamp : + Mtime.span -> burst_size:float -> fill_rate:float -> t +(** Create token bucket with given parameters and supplied inital timestamp. + @raises Invalid_argument if the fill_rate is 0 or negative. + @param timestamp Initial timestamp + @param burst_size Maximum number of tokens that can fit in the bucket + @param fill_rate Number of tokens added to the bucket per second + *) + +val peek_with_timestamp : Mtime.span -> t -> float +(** Retrieve token amount in token bucket at given timestamp. + Undefined behaviour when [timestamp] <= [tb.timestamp] + @param timestamp Current time + @param tb Token bucket + @return Amount of tokens in the token bucket + *) + +val consume_with_timestamp : (unit -> Mtime.span) -> t -> float -> bool +(** Consume tokens from the bucket in a thread-safe manner, using supplied + function for obtaining the current time + @param get_time Function to obtain timestamp, e.g. Mtime_clock.elapsed + @param tb Token bucket + @param amount How many tokens to consume + @return Whether the tokens were successfully consumed + *) + +val get_delay_until_available_timestamp : Mtime.span -> t -> float -> float +(** Get number of seconds that need to pass until bucket is expected to have + enough tokens to fulfil the request + @param timestamp + @param tb Token bucket + @param amount How many tokens we want to consume + @return Number of seconds until tokens are available +*) + +(**/**) diff --git a/opam/xapi-rate-limit.opam b/opam/xapi-rate-limit.opam new file mode 100644 index 0000000000..7ae6315b41 --- /dev/null +++ b/opam/xapi-rate-limit.opam @@ -0,0 +1,31 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "A simple token bucket-based rate limter for XAPI" +maintainer: ["Xapi project maintainers"] +authors: ["xen-api@lists.xen.org"] +license: "LGPL-2.1-only WITH OCaml-LGPL-linking-exception" +homepage: "https://xapi-project.github.io/" +bug-reports: "https://github.com/xapi-project/xen-api/issues" +depends: [ + "dune" {>= "3.20"} + "ocaml" {>= "4.12"} + "xapi-log" + "xapi-stdext-unix" + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/xapi-project/xen-api.git" +x-maintenance-intent: ["(latest)"] From 7e072bcb565043f7289536b0fbb48fb1a654e5f4 Mon Sep 17 00:00:00 2001 From: Christian Pardillo Laursen Date: Thu, 2 Apr 2026 10:45:11 +0100 Subject: [PATCH 2/2] Rate limit: Implement queueing mechanism Token buckets form the basis of the rate limiting, but on their own they lack a way of enabling fairness when waiting. For this, we add a separate queuing mechanism which registers callbacks for requests that are waiting on a token bucket to refill. A worker thread is tasked with awakening requests as enough tokens are placed back in the bucket. We expose an interface for submitting requests to a queue, both in synchronous and asynchronous mode. Signed-off-by: Christian Pardillo Laursen --- ocaml/libs/rate-limit/rate_limit.ml | 146 +++++++++++++++++++++++++++ ocaml/libs/rate-limit/rate_limit.mli | 47 +++++++++ 2 files changed, 193 insertions(+) create mode 100644 ocaml/libs/rate-limit/rate_limit.ml create mode 100644 ocaml/libs/rate-limit/rate_limit.mli diff --git a/ocaml/libs/rate-limit/rate_limit.ml b/ocaml/libs/rate-limit/rate_limit.ml new file mode 100644 index 0000000000..6d5f6e540f --- /dev/null +++ b/ocaml/libs/rate-limit/rate_limit.ml @@ -0,0 +1,146 @@ +(* + * Copyright (C) 2025 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +module D = Debug.Make (struct let name = __MODULE__ end) + +type t = { + bucket: Token_bucket.t + ; process_queue: + (float * (unit -> unit)) Queue.t (* contains token cost and callback *) + ; process_queue_lock: Mutex.t + ; worker_thread_cond: Condition.t + ; should_terminate: bool Atomic.t + (* Signal termination to worker thread. The worker thread will + process all remaining items in the queue before exiting. *) + ; worker_thread: Thread.t +} + +let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute + +(* The worker thread is responsible for calling the callback when the token + amount becomes available *) +let rec worker_loop ~bucket ~process_queue ~process_queue_lock + ~worker_thread_cond ~should_terminate = + let process_item cost callback = + Token_bucket.delay_then_consume bucket cost ; + callback () + in + let item_opt = + with_lock process_queue_lock (fun () -> + while + Queue.is_empty process_queue && not (Atomic.get should_terminate) + do + Condition.wait worker_thread_cond process_queue_lock + done ; + Queue.take_opt process_queue + ) + in + match item_opt with + | None -> + (* Queue is empty only when termination was signalled *) + D.debug "%s: queue empty in deleted rate limiter; exiting" __FUNCTION__ + | Some (cost, callback) -> + process_item cost callback ; + worker_loop ~bucket ~process_queue ~process_queue_lock ~worker_thread_cond + ~should_terminate + +let create ~burst_size ~fill_rate = + let bucket = Token_bucket.create ~burst_size ~fill_rate in + let process_queue = Queue.create () in + let process_queue_lock = Mutex.create () in + let worker_thread_cond = Condition.create () in + let should_terminate = Atomic.make false in + let worker_thread = + Thread.create + (fun () -> + worker_loop ~bucket ~process_queue ~process_queue_lock + ~worker_thread_cond ~should_terminate + ) + () + in + { + bucket + ; process_queue + ; process_queue_lock + ; worker_thread_cond + ; should_terminate + ; worker_thread + } + +let delete data = + if Atomic.compare_and_set data.should_terminate false true then + Condition.signal data.worker_thread_cond ; + Thread.join data.worker_thread + +let check_not_terminated should_terminate = + if Atomic.get should_terminate then + invalid_arg "Rate_limit: submit called on a deleted rate limiter" + +(* The callback should return quickly - if it is a longer task it is + responsible for creating a thread to do the task *) +let submit_async + { + bucket + ; process_queue + ; process_queue_lock + ; worker_thread_cond + ; should_terminate + ; _ + } ~callback amount = + check_not_terminated should_terminate ; + let run_immediately = + with_lock process_queue_lock (fun () -> + let immediate = + Queue.is_empty process_queue && Token_bucket.consume bucket amount + in + if not immediate then ( + Queue.add (amount, callback) process_queue ; + Condition.signal worker_thread_cond + ) ; + immediate + ) + in + if run_immediately then + callback () + else + D.debug "%s: rate limiting call" __FUNCTION__ + +(* Block and execute on the same thread *) +let submit_sync bucket_data ~callback amount = + check_not_terminated bucket_data.should_terminate ; + let channel_opt = + with_lock bucket_data.process_queue_lock (fun () -> + if + Queue.is_empty bucket_data.process_queue + && Token_bucket.consume bucket_data.bucket amount + then + None + (* Can run callback immediately after releasing lock *) + else + (* Rate limited, need to retrieve function result via channel *) + let channel = Event.new_channel () in + Queue.add + (amount, fun () -> Event.sync (Event.send channel ())) + bucket_data.process_queue ; + Condition.signal bucket_data.worker_thread_cond ; + Some channel + ) + in + match channel_opt with + | None -> + callback () + | Some channel -> + D.debug "%s: rate limiting call" __FUNCTION__ ; + Event.sync (Event.receive channel) ; + callback () diff --git a/ocaml/libs/rate-limit/rate_limit.mli b/ocaml/libs/rate-limit/rate_limit.mli new file mode 100644 index 0000000000..ec80a5f0ab --- /dev/null +++ b/ocaml/libs/rate-limit/rate_limit.mli @@ -0,0 +1,47 @@ +(* + * Copyright (C) 2025 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** Rate limiter built on top of a token bucket. Provides async and sync + submission of callbacks that are rate-limited. Uses a worker thread + to process queued callbacks when tokens become available. *) + +type t + +val create : burst_size:float -> fill_rate:float -> t +(** [create ~burst_size ~fill_rate] creates a new rate limiter with the given + token bucket parameters. + @raises Invalid_argument if the parameters are invalid + (e.g. non-positive fill rate). + @param burst_size Maximum number of tokens in the bucket + @param fill_rate Number of tokens added per second *) + +val delete : t -> unit +(** [delete t] signals the worker thread to terminate. The worker thread + processes any remaining queued callbacks, then exits. Blocks the caller + until the worker thread has finished. Subsequent calls to [submit_async] + or [submit_sync] will raise [Invalid_argument]. *) + +val submit_async : t -> callback:(unit -> unit) -> float -> unit +(** [submit_async t ~callback amount] submits a callback under rate limiting. + If tokens are immediately available and no callbacks are queued, the + callback runs synchronously on the calling thread. Otherwise it is + enqueued and will be executed by a worker thread when tokens become + available. Returns immediately. *) + +val submit_sync : t -> callback:(unit -> 'a) -> float -> 'a +(** [submit_sync t ~callback amount] submits a callback under rate limiting + and blocks until it completes, returning the callback's result. If tokens + are immediately available and no callbacks are queued, the callback runs + directly. Otherwise, the caller blocks until the worker thread signals + that tokens are available. *)