Skip to content

Commit 7cfb013

Browse files
committed
Rate limit: Implement token buckets
Token buckets form the core of the XAPI rate limiter. Token buckets hold tokens, which are refilled over time according to their refill rate up to a maximum. Each client making calls to XAPI will have one token bucket associated; each call takes tokens out of the bucket, and delays will be enforced such that any requests made to an empty bucket have to wait until the bucket has been refilled. The token bucket library implements functions for token bucket creation and thread-safe token consumption. Refills are calculated only when the bucket is observed by tracking a last_refill timestamp. Signed-off-by: Christian Pardillo Laursen <christian.pardillolaursen@citrix.com>
1 parent beede2a commit 7cfb013

5 files changed

Lines changed: 235 additions & 0 deletions

File tree

dune-project

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@
5252
(name tgroup)
5353
(depends xapi-log xapi-stdext-unix))
5454

55+
(package
56+
(name rate-limit)
57+
(synopsis "Simple token bucket-based rate-limiting")
58+
(depends
59+
(ocaml (>= 4.12))
60+
xapi-log xapi-stdext-unix))
61+
5562
(package
5663
(name xml-light2))
5764

ocaml/libs/rate-limit/dune

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
(library
2+
(name rate_limit)
3+
(public_name rate-limit)
4+
(wrapped false)
5+
(libraries threads.posix mtime mtime.clock.os xapi-log xapi-stdext-threads clock)
6+
)
7+
8+
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
(*
2+
* Copyright (C) 2025 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
type state = {tokens: float; last_refill: Mtime.span}
16+
17+
type t = {burst_size: float; fill_rate: float; state: state Atomic.t}
18+
19+
let create_with_timestamp timestamp ~burst_size ~fill_rate =
20+
if fill_rate <= 0. then
21+
None
22+
else
23+
let state = Atomic.make {tokens= burst_size; last_refill= timestamp} in
24+
Some {burst_size; fill_rate; state}
25+
26+
let create = create_with_timestamp (Mtime_clock.elapsed ())
27+
28+
let compute_tokens timestamp {tokens; last_refill} ~burst_size ~fill_rate =
29+
let time_delta = Mtime.Span.abs_diff last_refill timestamp in
30+
let time_delta_seconds = Mtime.Span.to_float_ns time_delta *. 1e-9 in
31+
min burst_size (tokens +. (time_delta_seconds *. fill_rate))
32+
33+
let peek_with_timestamp timestamp tb =
34+
let tb_state = Atomic.get tb.state in
35+
compute_tokens timestamp tb_state ~burst_size:tb.burst_size
36+
~fill_rate:tb.fill_rate
37+
38+
let peek tb = peek_with_timestamp (Mtime_clock.elapsed ()) tb
39+
40+
let consume_with_timestamp get_time tb amount =
41+
let rec try_consume () =
42+
let timestamp = get_time () in
43+
let old_state = Atomic.get tb.state in
44+
let new_tokens =
45+
compute_tokens timestamp old_state ~burst_size:tb.burst_size
46+
~fill_rate:tb.fill_rate
47+
in
48+
let success, final_tokens =
49+
if new_tokens >= amount then
50+
(true, new_tokens -. amount)
51+
else
52+
(false, new_tokens)
53+
in
54+
let new_state = {tokens= final_tokens; last_refill= timestamp} in
55+
if Atomic.compare_and_set tb.state old_state new_state then
56+
success
57+
else
58+
try_consume ()
59+
in
60+
try_consume ()
61+
62+
let consume = consume_with_timestamp Mtime_clock.elapsed
63+
64+
let get_delay_until_available_timestamp timestamp tb amount =
65+
let {tokens; last_refill} = Atomic.get tb.state in
66+
let current_tokens =
67+
compute_tokens timestamp {tokens; last_refill} ~burst_size:tb.burst_size
68+
~fill_rate:tb.fill_rate
69+
in
70+
let required_tokens = max 0. (amount -. current_tokens) in
71+
required_tokens /. tb.fill_rate
72+
73+
let get_delay_until_available tb amount =
74+
get_delay_until_available_timestamp (Mtime_clock.elapsed ()) tb amount
75+
76+
(* This implementation only works when there is only one thread trying to
77+
consume - fairness needs to be implemented on top of it with a queue.
78+
If there is no contention, it should only delay once. *)
79+
let rec delay_then_consume tb amount =
80+
if not (consume tb amount) then (
81+
Thread.delay (get_delay_until_available tb amount) ;
82+
delay_then_consume tb amount
83+
)
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
(*
2+
* Copyright (C) 2025 Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
(** This module implements a classic token-bucket rate limiter. Token buckets
16+
contain tokens that are refilled over time, and can be consumed in a
17+
thread-safe way. A token bucket accumulates [fill_rate] tokens per second,
18+
up to [burst_size]. Consumers may take tokens (if available), or query when
19+
enough tokens will become available.
20+
21+
Token buckets implement rate limiting by allowing operations to proceed
22+
only when sufficient tokens are available - otherwise, the operations can
23+
be delayed until enough tokens are available.
24+
25+
To avoid doing unnecessary work to refill the bucket, token amounts are
26+
only updated when a consume operation is carried out. The buckets keep a
27+
last_refill timestamp which is updated on consume in tandem with the token
28+
counts, and informs how many tokens should be added by the bucket refill.
29+
30+
We include versions of functions that take a timestamp as a parameter for
31+
testing purposes only - consumers of this library should use the
32+
timestamp-less versions.
33+
*)
34+
35+
type t
36+
37+
val create : burst_size:float -> fill_rate:float -> t option
38+
(** Create token bucket with given parameters.
39+
Returns None if the fill rate is 0 or negative.
40+
@param burst_size Maximum number of tokens that can fit in the bucket
41+
@param fill_rate Number of tokens added to the bucket per second
42+
*)
43+
44+
val peek : t -> float
45+
(** Retrieve current token amount
46+
@param tb Token bucket
47+
@return Amount of tokens in the token bucket
48+
*)
49+
50+
val consume : t -> float -> bool
51+
(** Consume tokens from the bucket in a thread-safe manner.
52+
@param tb Token bucket
53+
@param amount How many tokens to consume
54+
@return Whether the tokens were successfully consumed
55+
*)
56+
57+
val get_delay_until_available : t -> float -> float
58+
(** Get number of seconds that need to pass until bucket is expected to have
59+
enough tokens to fulfil the request
60+
@param tb Token bucket
61+
@param amount How many tokens we want to consume
62+
@return Number of seconds until tokens are available
63+
*)
64+
65+
val delay_then_consume : t -> float -> unit
66+
67+
(**/**)
68+
69+
(* Fuctions accepting a timestamp are meant for testing only *)
70+
71+
val create_with_timestamp :
72+
Mtime.span -> burst_size:float -> fill_rate:float -> t option
73+
(** Create token bucket with given parameters and supplied inital timestamp
74+
Returns None if the fill_rate is 0 or negative.
75+
@param timestamp Initial timestamp
76+
@param burst_size Maximum number of tokens that can fit in the bucket
77+
@param fill_rate Number of tokens added to the bucket per second
78+
*)
79+
80+
val peek_with_timestamp : Mtime.span -> t -> float
81+
(** Retrieve token amount in token bucket at given timestamp.
82+
Undefined behaviour when [timestamp] <= [tb.timestamp]
83+
@param timestamp Current time
84+
@param tb Token bucket
85+
@return Amount of tokens in the token bucket
86+
*)
87+
88+
val consume_with_timestamp : (unit -> Mtime.span) -> t -> float -> bool
89+
(** Consume tokens from the bucket in a thread-safe manner, using supplied
90+
function for obtaining the current time
91+
@param get_time Function to obtain timestamp, e.g. Mtime_clock.elapsed
92+
@param tb Token bucket
93+
@param amount How many tokens to consume
94+
@return Whether the tokens were successfully consumed
95+
*)
96+
97+
val get_delay_until_available_timestamp : Mtime.span -> t -> float -> float
98+
(** Get number of seconds that need to pass until bucket is expected to have
99+
enough tokens to fulfil the request
100+
@param timestamp
101+
@param tb Token bucket
102+
@param amount How many tokens we want to consume
103+
@return Number of seconds until tokens are available
104+
*)
105+
106+
(**/**)

opam/rate-limit.opam

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# This file is generated by dune, edit dune-project instead
2+
opam-version: "2.0"
3+
synopsis: "Simple token bucket-based rate-limiting"
4+
maintainer: ["Xapi project maintainers"]
5+
authors: ["xen-api@lists.xen.org"]
6+
license: "LGPL-2.1-only WITH OCaml-LGPL-linking-exception"
7+
homepage: "https://xapi-project.github.io/"
8+
bug-reports: "https://github.com/xapi-project/xen-api/issues"
9+
depends: [
10+
"dune" {>= "3.20"}
11+
"ocaml" {>= "4.12"}
12+
"xapi-log"
13+
"xapi-stdext-unix"
14+
"odoc" {with-doc}
15+
]
16+
build: [
17+
["dune" "subst"] {dev}
18+
[
19+
"dune"
20+
"build"
21+
"-p"
22+
name
23+
"-j"
24+
jobs
25+
"@install"
26+
"@runtest" {with-test}
27+
"@doc" {with-doc}
28+
]
29+
]
30+
dev-repo: "git+https://github.com/xapi-project/xen-api.git"
31+
x-maintenance-intent: ["(latest)"]

0 commit comments

Comments
 (0)