Skip to content

Commit 9d76c1b

Browse files
committed
Rate limit: Implement token buckets
Token buckets form the core of the XAPI rate limiter. Token buckets hold tokens, which are refilled over time according to their refill rate up to a maximum. Each client making calls to XAPI will have one token bucket associated; each call takes tokens out of the bucket, and delays will be enforced such that any requests made to an empty bucket have to wait until the bucket has been refilled. The token bucket library implements functions for token bucket creation and thread-safe token consumption. Refills are calculated only when the bucket is observed by tracking a last_refill timestamp. Signed-off-by: Christian Pardillo Laursen <christian.pardillolaursen@citrix.com>
1 parent beede2a commit 9d76c1b

5 files changed

Lines changed: 241 additions & 0 deletions

File tree

dune-project

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@
5252
(name tgroup)
5353
(depends xapi-log xapi-stdext-unix))
5454

55+
(package
56+
(name xapi-rate-limit)
57+
(synopsis "A simple token bucket-based rate limter for XAPI")
58+
(depends
59+
(ocaml (>= 4.12))
60+
xapi-log xapi-stdext-unix))
61+
5562
(package
5663
(name xml-light2))
5764

ocaml/libs/rate-limit/dune

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
(library
2+
(name xapi_rate_limit)
3+
(public_name xapi-rate-limit)
4+
5+
(libraries threads.posix mtime mtime.clock.os xapi-log xapi-stdext-threads clock)
6+
)
7+
8+
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
(*
2+
* Copyright (C) 2025 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
type state = {tokens: float; last_refill: Mtime.span}
16+
17+
type t = {burst_size: float; fill_rate: float; state: state Atomic.t}
18+
19+
let create_with_timestamp timestamp ~burst_size ~fill_rate =
20+
if fill_rate <= 0. then
21+
invalid_arg "Token_bucket.create: fill_rate must be positive"
22+
else
23+
let state = Atomic.make {tokens= burst_size; last_refill= timestamp} in
24+
{burst_size; fill_rate; state}
25+
26+
let create = create_with_timestamp (Mtime_clock.elapsed ())
27+
28+
let compute_tokens timestamp {tokens; last_refill} ~burst_size ~fill_rate =
29+
let time_delta = Mtime.Span.abs_diff last_refill timestamp in
30+
let time_delta_seconds = Mtime.Span.to_float_ns time_delta *. 1e-9 in
31+
min burst_size (tokens +. (time_delta_seconds *. fill_rate))
32+
33+
let peek_with_timestamp timestamp tb =
34+
let tb_state = Atomic.get tb.state in
35+
compute_tokens timestamp tb_state ~burst_size:tb.burst_size
36+
~fill_rate:tb.fill_rate
37+
38+
let peek tb = peek_with_timestamp (Mtime_clock.elapsed ()) tb
39+
40+
let consume_with_timestamp get_time tb amount =
41+
let rec try_consume () =
42+
let timestamp = get_time () in
43+
let old_state = Atomic.get tb.state in
44+
let new_tokens =
45+
compute_tokens timestamp old_state ~burst_size:tb.burst_size
46+
~fill_rate:tb.fill_rate
47+
in
48+
let success, final_tokens =
49+
if new_tokens >= amount then
50+
(true, new_tokens -. amount)
51+
else
52+
(false, new_tokens)
53+
in
54+
let new_state = {tokens= final_tokens; last_refill= timestamp} in
55+
if Atomic.compare_and_set tb.state old_state new_state then
56+
success
57+
else
58+
try_consume ()
59+
in
60+
try_consume ()
61+
62+
let consume = consume_with_timestamp Mtime_clock.elapsed
63+
64+
let get_delay_until_available_timestamp timestamp tb amount =
65+
let {tokens; last_refill} = Atomic.get tb.state in
66+
let current_tokens =
67+
compute_tokens timestamp {tokens; last_refill} ~burst_size:tb.burst_size
68+
~fill_rate:tb.fill_rate
69+
in
70+
let required_tokens = max 0. (amount -. current_tokens) in
71+
required_tokens /. tb.fill_rate
72+
73+
let get_delay_until_available tb amount =
74+
get_delay_until_available_timestamp (Mtime_clock.elapsed ()) tb amount
75+
76+
(* This implementation only works when there is only one thread trying to
77+
consume - fairness needs to be implemented on top of it with a queue.
78+
If there is no contention, it should only delay once. *)
79+
let rec delay_then_consume tb amount =
80+
if not (consume tb amount) then (
81+
Thread.delay (get_delay_until_available tb amount) ;
82+
delay_then_consume tb amount
83+
)
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
(*
2+
* Copyright (C) 2025 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
(** This module implements a classic token-bucket rate limiter. Token buckets
16+
contain tokens that are refilled over time, and can be consumed in a
17+
thread-safe way. A token bucket accumulates [fill_rate] tokens per second,
18+
up to [burst_size]. Consumers may take tokens (if available), or query when
19+
enough tokens will become available.
20+
21+
Token counts are represented as floats rather than integers to allow
22+
fractional token costs and fine-grained refill rates.
23+
24+
Token buckets implement rate limiting by allowing operations to proceed
25+
only when sufficient tokens are available - otherwise, the operations can
26+
be delayed until enough tokens are available.
27+
28+
To avoid doing unnecessary work to refill the bucket, token amounts are
29+
only updated when a consume operation is carried out. The buckets keep a
30+
last_refill timestamp which is updated on consume in tandem with the token
31+
counts, and informs how many tokens should be added by the bucket refill.
32+
33+
We include versions of functions that take a timestamp as a parameter for
34+
testing purposes only - consumers of this library should use the
35+
timestamp-less versions.
36+
*)
37+
38+
type t
39+
40+
val create : burst_size:float -> fill_rate:float -> t
41+
(** Create token bucket with given parameters.
42+
@raises Invalid_argument if the fill rate is 0 or negative.
43+
@param burst_size Maximum number of tokens that can fit in the bucket
44+
@param fill_rate Number of tokens added to the bucket per second
45+
*)
46+
47+
val peek : t -> float
48+
(** Retrieve current token amount
49+
@param tb Token bucket
50+
@return Amount of tokens in the token bucket
51+
*)
52+
53+
val consume : t -> float -> bool
54+
(** Consume tokens from the bucket in a thread-safe manner.
55+
@param tb Token bucket
56+
@param amount How many tokens to consume
57+
@return Whether the tokens were successfully consumed
58+
*)
59+
60+
val get_delay_until_available : t -> float -> float
61+
(** Get number of seconds that need to pass until bucket is expected to have
62+
enough tokens to fulfil the request
63+
@param tb Token bucket
64+
@param amount How many tokens we want to consume
65+
@return Number of seconds until tokens are available
66+
*)
67+
68+
val delay_then_consume : t -> float -> unit
69+
(** [delay_then_consume tb amount] sleeps the calling thread until [amount]
70+
tokens are available, then consumes them. Thread-safe but does not
71+
guarantee fairness between competing callers. *)
72+
73+
(**/**)
74+
75+
(* Fuctions accepting a timestamp are meant for testing only *)
76+
77+
val create_with_timestamp :
78+
Mtime.span -> burst_size:float -> fill_rate:float -> t
79+
(** Create token bucket with given parameters and supplied inital timestamp.
80+
@raises Invalid_argument if the fill_rate is 0 or negative.
81+
@param timestamp Initial timestamp
82+
@param burst_size Maximum number of tokens that can fit in the bucket
83+
@param fill_rate Number of tokens added to the bucket per second
84+
*)
85+
86+
val peek_with_timestamp : Mtime.span -> t -> float
87+
(** Retrieve token amount in token bucket at given timestamp.
88+
Undefined behaviour when [timestamp] <= [tb.timestamp]
89+
@param timestamp Current time
90+
@param tb Token bucket
91+
@return Amount of tokens in the token bucket
92+
*)
93+
94+
val consume_with_timestamp : (unit -> Mtime.span) -> t -> float -> bool
95+
(** Consume tokens from the bucket in a thread-safe manner, using supplied
96+
function for obtaining the current time
97+
@param get_time Function to obtain timestamp, e.g. Mtime_clock.elapsed
98+
@param tb Token bucket
99+
@param amount How many tokens to consume
100+
@return Whether the tokens were successfully consumed
101+
*)
102+
103+
val get_delay_until_available_timestamp : Mtime.span -> t -> float -> float
104+
(** Get number of seconds that need to pass until bucket is expected to have
105+
enough tokens to fulfil the request
106+
@param timestamp
107+
@param tb Token bucket
108+
@param amount How many tokens we want to consume
109+
@return Number of seconds until tokens are available
110+
*)
111+
112+
(**/**)

opam/xapi-rate-limit.opam

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# This file is generated by dune, edit dune-project instead
2+
opam-version: "2.0"
3+
synopsis: "A simple token bucket-based rate limter for XAPI"
4+
maintainer: ["Xapi project maintainers"]
5+
authors: ["xen-api@lists.xen.org"]
6+
license: "LGPL-2.1-only WITH OCaml-LGPL-linking-exception"
7+
homepage: "https://xapi-project.github.io/"
8+
bug-reports: "https://github.com/xapi-project/xen-api/issues"
9+
depends: [
10+
"dune" {>= "3.20"}
11+
"ocaml" {>= "4.12"}
12+
"xapi-log"
13+
"xapi-stdext-unix"
14+
"odoc" {with-doc}
15+
]
16+
build: [
17+
["dune" "subst"] {dev}
18+
[
19+
"dune"
20+
"build"
21+
"-p"
22+
name
23+
"-j"
24+
jobs
25+
"@install"
26+
"@runtest" {with-test}
27+
"@doc" {with-doc}
28+
]
29+
]
30+
dev-repo: "git+https://github.com/xapi-project/xen-api.git"
31+
x-maintenance-intent: ["(latest)"]

0 commit comments

Comments
 (0)