Skip to content

Commit fa65aaf

Browse files
committed
fixup! Address PR comments
Signed-off-by: Christian Pardillo Laursen <christian.pardillolaursen@citrix.com>
1 parent 69ee866 commit fa65aaf

5 files changed

Lines changed: 60 additions & 40 deletions

File tree

ocaml/libs/rate-limit/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(library
22
(name xapi_rate_limit)
33
(public_name xapi-rate-limit)
4-
(wrapped false)
4+
55
(libraries threads.posix mtime mtime.clock.os xapi-log xapi-stdext-threads clock)
66
)
77

ocaml/libs/rate-limit/rate_limit.ml

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,17 @@
1212
* GNU Lesser General Public License for more details.
1313
*)
1414

15-
module D = Debug.Make (struct let name = "rate_limit" end)
15+
module D = Debug.Make (struct let name = __MODULE__ end)
1616

1717
type t = {
1818
bucket: Token_bucket.t
1919
; process_queue:
2020
(float * (unit -> unit)) Queue.t (* contains token cost and callback *)
2121
; process_queue_lock: Mutex.t
2222
; worker_thread_cond: Condition.t
23-
; should_terminate: bool ref (* signal termination to worker thread *)
23+
; should_terminate: bool ref
24+
(* Signal termination to worker thread. The worker thread will
25+
process all remaining items in the queue before exiting. *)
2426
; worker_thread: Thread.t
2527
}
2628

@@ -52,30 +54,27 @@ let rec worker_loop ~bucket ~process_queue ~process_queue_lock
5254
~should_terminate
5355

5456
let create ~burst_size ~fill_rate =
55-
match Token_bucket.create ~burst_size ~fill_rate with
56-
| Some bucket ->
57-
let process_queue = Queue.create () in
58-
let process_queue_lock = Mutex.create () in
59-
let worker_thread_cond = Condition.create () in
60-
let should_terminate = ref false in
61-
let worker_thread =
62-
Thread.create
63-
(fun () ->
64-
worker_loop ~bucket ~process_queue ~process_queue_lock
65-
~worker_thread_cond ~should_terminate
66-
)
67-
()
68-
in
69-
{
70-
bucket
71-
; process_queue
72-
; process_queue_lock
73-
; worker_thread_cond
74-
; should_terminate
75-
; worker_thread
76-
}
77-
| None ->
78-
raise (Failure "Invalid token bucket parameters")
57+
let bucket = Token_bucket.create ~burst_size ~fill_rate in
58+
let process_queue = Queue.create () in
59+
let process_queue_lock = Mutex.create () in
60+
let worker_thread_cond = Condition.create () in
61+
let should_terminate = ref false in
62+
let worker_thread =
63+
Thread.create
64+
(fun () ->
65+
worker_loop ~bucket ~process_queue ~process_queue_lock
66+
~worker_thread_cond ~should_terminate
67+
)
68+
()
69+
in
70+
{
71+
bucket
72+
; process_queue
73+
; process_queue_lock
74+
; worker_thread_cond
75+
; should_terminate
76+
; worker_thread
77+
}
7978

8079
let delete data =
8180
with_lock data.process_queue_lock (fun () ->
@@ -84,11 +83,22 @@ let delete data =
8483
) ;
8584
Thread.join data.worker_thread
8685

86+
let check_not_terminated should_terminate =
87+
if !should_terminate then
88+
invalid_arg "Rate_limit: submit called on a deleted rate limiter"
89+
8790
(* The callback should return quickly - if it is a longer task it is
8891
responsible for creating a thread to do the task *)
8992
let submit_async
90-
({bucket; process_queue; process_queue_lock; worker_thread_cond; _} as _data)
91-
~callback amount =
93+
{
94+
bucket
95+
; process_queue
96+
; process_queue_lock
97+
; worker_thread_cond
98+
; should_terminate
99+
; _
100+
} ~callback amount =
101+
check_not_terminated should_terminate ;
92102
let run_immediately =
93103
with_lock process_queue_lock (fun () ->
94104
let immediate =
@@ -104,10 +114,11 @@ let submit_async
104114
if run_immediately then
105115
callback ()
106116
else
107-
D.debug "rate limiting sync call"
117+
D.debug "%s: rate limiting call" __FUNCTION__
108118

109119
(* Block and execute on the same thread *)
110120
let submit_sync bucket_data ~callback amount =
121+
check_not_terminated bucket_data.should_terminate ;
111122
let channel_opt =
112123
with_lock bucket_data.process_queue_lock (fun () ->
113124
if

ocaml/libs/rate-limit/rate_limit.mli

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@ type t
2020

2121
val create : burst_size:float -> fill_rate:float -> t
2222
(** [create ~burst_size ~fill_rate] creates a new rate limiter with the given
23-
token bucket parameters. Raises [Failure] if the parameters are invalid
23+
token bucket parameters.
24+
@raises Invalid_argument if the parameters are invalid
2425
(e.g. non-positive fill rate).
2526
@param burst_size Maximum number of tokens in the bucket
2627
@param fill_rate Number of tokens added per second *)
2728

2829
val delete : t -> unit
29-
(** [delete t] signals the worker thread to terminate and waits for it to
30-
finish processing any remaining queued callbacks. *)
30+
(** [delete t] signals the worker thread to terminate. The worker thread
31+
processes any remaining queued callbacks, then exits. Blocks the caller
32+
until the worker thread has finished. Subsequent calls to [submit_async]
33+
or [submit_sync] will raise [Invalid_argument]. *)
3134

3235
val submit_async : t -> callback:(unit -> unit) -> float -> unit
3336
(** [submit_async t ~callback amount] submits a callback under rate limiting.

ocaml/libs/rate-limit/token_bucket.ml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ type t = {burst_size: float; fill_rate: float; state: state Atomic.t}
1818

1919
let create_with_timestamp timestamp ~burst_size ~fill_rate =
2020
if fill_rate <= 0. then
21-
None
21+
invalid_arg "Token_bucket.create: fill_rate must be positive"
2222
else
2323
let state = Atomic.make {tokens= burst_size; last_refill= timestamp} in
24-
Some {burst_size; fill_rate; state}
24+
{burst_size; fill_rate; state}
2525

2626
let create = create_with_timestamp (Mtime_clock.elapsed ())
2727

ocaml/libs/rate-limit/token_bucket.mli

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
up to [burst_size]. Consumers may take tokens (if available), or query when
1919
enough tokens will become available.
2020
21+
Token counts are represented as floats rather than integers to allow
22+
fractional token costs and fine-grained refill rates.
23+
2124
Token buckets implement rate limiting by allowing operations to proceed
2225
only when sufficient tokens are available - otherwise, the operations can
2326
be delayed until enough tokens are available.
@@ -34,9 +37,9 @@
3437

3538
type t
3639

37-
val create : burst_size:float -> fill_rate:float -> t option
40+
val create : burst_size:float -> fill_rate:float -> t
3841
(** Create token bucket with given parameters.
39-
Returns None if the fill rate is 0 or negative.
42+
@raises Invalid_argument if the fill rate is 0 or negative.
4043
@param burst_size Maximum number of tokens that can fit in the bucket
4144
@param fill_rate Number of tokens added to the bucket per second
4245
*)
@@ -63,15 +66,18 @@ val get_delay_until_available : t -> float -> float
6366
*)
6467

6568
val delay_then_consume : t -> float -> unit
69+
(** [delay_then_consume tb amount] sleeps the calling thread until [amount]
70+
tokens are available, then consumes them. Thread-safe but does not
71+
guarantee fairness between competing callers. *)
6672

6773
(**/**)
6874

6975
(* Fuctions accepting a timestamp are meant for testing only *)
7076

7177
val create_with_timestamp :
72-
Mtime.span -> burst_size:float -> fill_rate:float -> t option
73-
(** Create token bucket with given parameters and supplied inital timestamp
74-
Returns None if the fill_rate is 0 or negative.
78+
Mtime.span -> burst_size:float -> fill_rate:float -> t
79+
(** Create token bucket with given parameters and supplied inital timestamp.
80+
@raises Invalid_argument if the fill_rate is 0 or negative.
7581
@param timestamp Initial timestamp
7682
@param burst_size Maximum number of tokens that can fit in the bucket
7783
@param fill_rate Number of tokens added to the bucket per second

0 commit comments

Comments
 (0)