diff --git a/dune-project b/dune-project index 94f67c4c19..43ebedd127 100644 --- a/dune-project +++ b/dune-project @@ -52,6 +52,13 @@ (name tgroup) (depends xapi-log xapi-stdext-unix)) +(package + (name xapi-rate-limit) + (synopsis "A simple token bucket-based rate limter for XAPI") + (depends + (ocaml (>= 4.12)) + xapi-log xapi-stdext-unix)) + (package (name xml-light2)) diff --git a/ocaml/libs/rate-limit/dune b/ocaml/libs/rate-limit/dune new file mode 100644 index 0000000000..418c055d8d --- /dev/null +++ b/ocaml/libs/rate-limit/dune @@ -0,0 +1,8 @@ +(library + (name xapi_rate_limit) + (public_name xapi-rate-limit) + + (libraries threads.posix mtime mtime.clock.os xapi-log xapi-stdext-threads clock) +) + + diff --git a/ocaml/libs/rate-limit/rate_limit.ml b/ocaml/libs/rate-limit/rate_limit.ml new file mode 100644 index 0000000000..6d5f6e540f --- /dev/null +++ b/ocaml/libs/rate-limit/rate_limit.ml @@ -0,0 +1,146 @@ +(* + * Copyright (C) 2025 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +module D = Debug.Make (struct let name = __MODULE__ end) + +type t = { + bucket: Token_bucket.t + ; process_queue: + (float * (unit -> unit)) Queue.t (* contains token cost and callback *) + ; process_queue_lock: Mutex.t + ; worker_thread_cond: Condition.t + ; should_terminate: bool Atomic.t + (* Signal termination to worker thread. The worker thread will + process all remaining items in the queue before exiting. *) + ; worker_thread: Thread.t +} + +let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute + +(* The worker thread is responsible for calling the callback when the token + amount becomes available *) +let rec worker_loop ~bucket ~process_queue ~process_queue_lock + ~worker_thread_cond ~should_terminate = + let process_item cost callback = + Token_bucket.delay_then_consume bucket cost ; + callback () + in + let item_opt = + with_lock process_queue_lock (fun () -> + while + Queue.is_empty process_queue && not (Atomic.get should_terminate) + do + Condition.wait worker_thread_cond process_queue_lock + done ; + Queue.take_opt process_queue + ) + in + match item_opt with + | None -> + (* Queue is empty only when termination was signalled *) + D.debug "%s: queue empty in deleted rate limiter; exiting" __FUNCTION__ + | Some (cost, callback) -> + process_item cost callback ; + worker_loop ~bucket ~process_queue ~process_queue_lock ~worker_thread_cond + ~should_terminate + +let create ~burst_size ~fill_rate = + let bucket = Token_bucket.create ~burst_size ~fill_rate in + let process_queue = Queue.create () in + let process_queue_lock = Mutex.create () in + let worker_thread_cond = Condition.create () in + let should_terminate = Atomic.make false in + let worker_thread = + Thread.create + (fun () -> + worker_loop ~bucket ~process_queue ~process_queue_lock + ~worker_thread_cond ~should_terminate + ) + () + in + { + bucket + ; process_queue + ; process_queue_lock + ; worker_thread_cond + ; should_terminate + ; worker_thread + } + +let delete data = + if Atomic.compare_and_set data.should_terminate false true then + Condition.signal data.worker_thread_cond ; + Thread.join data.worker_thread + +let check_not_terminated should_terminate = + if Atomic.get should_terminate then + invalid_arg "Rate_limit: submit called on a deleted rate limiter" + +(* The callback should return quickly - if it is a longer task it is + responsible for creating a thread to do the task *) +let submit_async + { + bucket + ; process_queue + ; process_queue_lock + ; worker_thread_cond + ; should_terminate + ; _ + } ~callback amount = + check_not_terminated should_terminate ; + let run_immediately = + with_lock process_queue_lock (fun () -> + let immediate = + Queue.is_empty process_queue && Token_bucket.consume bucket amount + in + if not immediate then ( + Queue.add (amount, callback) process_queue ; + Condition.signal worker_thread_cond + ) ; + immediate + ) + in + if run_immediately then + callback () + else + D.debug "%s: rate limiting call" __FUNCTION__ + +(* Block and execute on the same thread *) +let submit_sync bucket_data ~callback amount = + check_not_terminated bucket_data.should_terminate ; + let channel_opt = + with_lock bucket_data.process_queue_lock (fun () -> + if + Queue.is_empty bucket_data.process_queue + && Token_bucket.consume bucket_data.bucket amount + then + None + (* Can run callback immediately after releasing lock *) + else + (* Rate limited, need to retrieve function result via channel *) + let channel = Event.new_channel () in + Queue.add + (amount, fun () -> Event.sync (Event.send channel ())) + bucket_data.process_queue ; + Condition.signal bucket_data.worker_thread_cond ; + Some channel + ) + in + match channel_opt with + | None -> + callback () + | Some channel -> + D.debug "%s: rate limiting call" __FUNCTION__ ; + Event.sync (Event.receive channel) ; + callback () diff --git a/ocaml/libs/rate-limit/rate_limit.mli b/ocaml/libs/rate-limit/rate_limit.mli new file mode 100644 index 0000000000..ec80a5f0ab --- /dev/null +++ b/ocaml/libs/rate-limit/rate_limit.mli @@ -0,0 +1,47 @@ +(* + * Copyright (C) 2025 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** Rate limiter built on top of a token bucket. Provides async and sync + submission of callbacks that are rate-limited. Uses a worker thread + to process queued callbacks when tokens become available. *) + +type t + +val create : burst_size:float -> fill_rate:float -> t +(** [create ~burst_size ~fill_rate] creates a new rate limiter with the given + token bucket parameters. + @raises Invalid_argument if the parameters are invalid + (e.g. non-positive fill rate). + @param burst_size Maximum number of tokens in the bucket + @param fill_rate Number of tokens added per second *) + +val delete : t -> unit +(** [delete t] signals the worker thread to terminate. The worker thread + processes any remaining queued callbacks, then exits. Blocks the caller + until the worker thread has finished. Subsequent calls to [submit_async] + or [submit_sync] will raise [Invalid_argument]. *) + +val submit_async : t -> callback:(unit -> unit) -> float -> unit +(** [submit_async t ~callback amount] submits a callback under rate limiting. + If tokens are immediately available and no callbacks are queued, the + callback runs synchronously on the calling thread. Otherwise it is + enqueued and will be executed by a worker thread when tokens become + available. Returns immediately. *) + +val submit_sync : t -> callback:(unit -> 'a) -> float -> 'a +(** [submit_sync t ~callback amount] submits a callback under rate limiting + and blocks until it completes, returning the callback's result. If tokens + are immediately available and no callbacks are queued, the callback runs + directly. Otherwise, the caller blocks until the worker thread signals + that tokens are available. *) diff --git a/ocaml/libs/rate-limit/token_bucket.ml b/ocaml/libs/rate-limit/token_bucket.ml new file mode 100644 index 0000000000..25a02c0963 --- /dev/null +++ b/ocaml/libs/rate-limit/token_bucket.ml @@ -0,0 +1,83 @@ +(* + * Copyright (C) 2025 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +type state = {tokens: float; last_refill: Mtime.span} + +type t = {burst_size: float; fill_rate: float; state: state Atomic.t} + +let create_with_timestamp timestamp ~burst_size ~fill_rate = + if fill_rate <= 0. then + invalid_arg "Token_bucket.create: fill_rate must be positive" + else + let state = Atomic.make {tokens= burst_size; last_refill= timestamp} in + {burst_size; fill_rate; state} + +let create = create_with_timestamp (Mtime_clock.elapsed ()) + +let compute_tokens timestamp {tokens; last_refill} ~burst_size ~fill_rate = + let time_delta = Mtime.Span.abs_diff last_refill timestamp in + let time_delta_seconds = Mtime.Span.to_float_ns time_delta *. 1e-9 in + min burst_size (tokens +. (time_delta_seconds *. fill_rate)) + +let peek_with_timestamp timestamp tb = + let tb_state = Atomic.get tb.state in + compute_tokens timestamp tb_state ~burst_size:tb.burst_size + ~fill_rate:tb.fill_rate + +let peek tb = peek_with_timestamp (Mtime_clock.elapsed ()) tb + +let consume_with_timestamp get_time tb amount = + let rec try_consume () = + let timestamp = get_time () in + let old_state = Atomic.get tb.state in + let new_tokens = + compute_tokens timestamp old_state ~burst_size:tb.burst_size + ~fill_rate:tb.fill_rate + in + let success, final_tokens = + if new_tokens >= amount then + (true, new_tokens -. amount) + else + (false, new_tokens) + in + let new_state = {tokens= final_tokens; last_refill= timestamp} in + if Atomic.compare_and_set tb.state old_state new_state then + success + else + try_consume () + in + try_consume () + +let consume = consume_with_timestamp Mtime_clock.elapsed + +let get_delay_until_available_timestamp timestamp tb amount = + let {tokens; last_refill} = Atomic.get tb.state in + let current_tokens = + compute_tokens timestamp {tokens; last_refill} ~burst_size:tb.burst_size + ~fill_rate:tb.fill_rate + in + let required_tokens = max 0. (amount -. current_tokens) in + required_tokens /. tb.fill_rate + +let get_delay_until_available tb amount = + get_delay_until_available_timestamp (Mtime_clock.elapsed ()) tb amount + +(* This implementation only works when there is only one thread trying to + consume - fairness needs to be implemented on top of it with a queue. + If there is no contention, it should only delay once. *) +let rec delay_then_consume tb amount = + if not (consume tb amount) then ( + Thread.delay (get_delay_until_available tb amount) ; + delay_then_consume tb amount + ) diff --git a/ocaml/libs/rate-limit/token_bucket.mli b/ocaml/libs/rate-limit/token_bucket.mli new file mode 100644 index 0000000000..84f7603737 --- /dev/null +++ b/ocaml/libs/rate-limit/token_bucket.mli @@ -0,0 +1,112 @@ +(* + * Copyright (C) 2025 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +(** This module implements a classic token-bucket rate limiter. Token buckets + contain tokens that are refilled over time, and can be consumed in a + thread-safe way. A token bucket accumulates [fill_rate] tokens per second, + up to [burst_size]. Consumers may take tokens (if available), or query when + enough tokens will become available. + + Token counts are represented as floats rather than integers to allow + fractional token costs and fine-grained refill rates. + + Token buckets implement rate limiting by allowing operations to proceed + only when sufficient tokens are available - otherwise, the operations can + be delayed until enough tokens are available. + + To avoid doing unnecessary work to refill the bucket, token amounts are + only updated when a consume operation is carried out. The buckets keep a + last_refill timestamp which is updated on consume in tandem with the token + counts, and informs how many tokens should be added by the bucket refill. + + We include versions of functions that take a timestamp as a parameter for + testing purposes only - consumers of this library should use the + timestamp-less versions. +*) + +type t + +val create : burst_size:float -> fill_rate:float -> t +(** Create token bucket with given parameters. + @raises Invalid_argument if the fill rate is 0 or negative. + @param burst_size Maximum number of tokens that can fit in the bucket + @param fill_rate Number of tokens added to the bucket per second + *) + +val peek : t -> float +(** Retrieve current token amount + @param tb Token bucket + @return Amount of tokens in the token bucket + *) + +val consume : t -> float -> bool +(** Consume tokens from the bucket in a thread-safe manner. + @param tb Token bucket + @param amount How many tokens to consume + @return Whether the tokens were successfully consumed + *) + +val get_delay_until_available : t -> float -> float +(** Get number of seconds that need to pass until bucket is expected to have + enough tokens to fulfil the request + @param tb Token bucket + @param amount How many tokens we want to consume + @return Number of seconds until tokens are available +*) + +val delay_then_consume : t -> float -> unit +(** [delay_then_consume tb amount] sleeps the calling thread until [amount] + tokens are available, then consumes them. Thread-safe but does not + guarantee fairness between competing callers. *) + +(**/**) + +(* Fuctions accepting a timestamp are meant for testing only *) + +val create_with_timestamp : + Mtime.span -> burst_size:float -> fill_rate:float -> t +(** Create token bucket with given parameters and supplied inital timestamp. + @raises Invalid_argument if the fill_rate is 0 or negative. + @param timestamp Initial timestamp + @param burst_size Maximum number of tokens that can fit in the bucket + @param fill_rate Number of tokens added to the bucket per second + *) + +val peek_with_timestamp : Mtime.span -> t -> float +(** Retrieve token amount in token bucket at given timestamp. + Undefined behaviour when [timestamp] <= [tb.timestamp] + @param timestamp Current time + @param tb Token bucket + @return Amount of tokens in the token bucket + *) + +val consume_with_timestamp : (unit -> Mtime.span) -> t -> float -> bool +(** Consume tokens from the bucket in a thread-safe manner, using supplied + function for obtaining the current time + @param get_time Function to obtain timestamp, e.g. Mtime_clock.elapsed + @param tb Token bucket + @param amount How many tokens to consume + @return Whether the tokens were successfully consumed + *) + +val get_delay_until_available_timestamp : Mtime.span -> t -> float -> float +(** Get number of seconds that need to pass until bucket is expected to have + enough tokens to fulfil the request + @param timestamp + @param tb Token bucket + @param amount How many tokens we want to consume + @return Number of seconds until tokens are available +*) + +(**/**) diff --git a/opam/xapi-rate-limit.opam b/opam/xapi-rate-limit.opam new file mode 100644 index 0000000000..7ae6315b41 --- /dev/null +++ b/opam/xapi-rate-limit.opam @@ -0,0 +1,31 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "A simple token bucket-based rate limter for XAPI" +maintainer: ["Xapi project maintainers"] +authors: ["xen-api@lists.xen.org"] +license: "LGPL-2.1-only WITH OCaml-LGPL-linking-exception" +homepage: "https://xapi-project.github.io/" +bug-reports: "https://github.com/xapi-project/xen-api/issues" +depends: [ + "dune" {>= "3.20"} + "ocaml" {>= "4.12"} + "xapi-log" + "xapi-stdext-unix" + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/xapi-project/xen-api.git" +x-maintenance-intent: ["(latest)"]