-
Notifications
You must be signed in to change notification settings - Fork 299
Rate limit: Implement token buckets #6983
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
cplaursen
merged 2 commits into
xapi-project:feature/throttling2
from
cplaursen:feature/throttling
Apr 2, 2026
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| (library | ||
| (name xapi_rate_limit) | ||
| (public_name xapi-rate-limit) | ||
|
|
||
| (libraries threads.posix mtime mtime.clock.os xapi-log xapi-stdext-threads clock) | ||
| ) | ||
|
|
||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| (* | ||
| * Copyright (C) 2025 Cloud Software Group | ||
| * | ||
| * This program is free software; you can redistribute it and/or modify | ||
| * it under the terms of the GNU Lesser General Public License as published | ||
| * by the Free Software Foundation; version 2.1 only. with the special | ||
| * exception on linking described in file LICENSE. | ||
| * | ||
| * This program is distributed in the hope that it will be useful, | ||
| * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| * GNU Lesser General Public License for more details. | ||
| *) | ||
|
|
||
| module D = Debug.Make (struct let name = __MODULE__ end) | ||
|
|
||
| type t = { | ||
| bucket: Token_bucket.t | ||
| ; process_queue: | ||
| (float * (unit -> unit)) Queue.t (* contains token cost and callback *) | ||
| ; process_queue_lock: Mutex.t | ||
| ; worker_thread_cond: Condition.t | ||
| ; should_terminate: bool Atomic.t | ||
| (* Signal termination to worker thread. The worker thread will | ||
| process all remaining items in the queue before exiting. *) | ||
| ; worker_thread: Thread.t | ||
| } | ||
|
|
||
| let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute | ||
|
|
||
| (* The worker thread is responsible for calling the callback when the token | ||
| amount becomes available *) | ||
| let rec worker_loop ~bucket ~process_queue ~process_queue_lock | ||
| ~worker_thread_cond ~should_terminate = | ||
| let process_item cost callback = | ||
| Token_bucket.delay_then_consume bucket cost ; | ||
| callback () | ||
| in | ||
| let item_opt = | ||
| with_lock process_queue_lock (fun () -> | ||
| while | ||
| Queue.is_empty process_queue && not (Atomic.get should_terminate) | ||
| do | ||
| Condition.wait worker_thread_cond process_queue_lock | ||
| done ; | ||
| Queue.take_opt process_queue | ||
| ) | ||
| in | ||
| match item_opt with | ||
| | None -> | ||
| (* Queue is empty only when termination was signalled *) | ||
| D.debug "%s: queue empty in deleted rate limiter; exiting" __FUNCTION__ | ||
| | Some (cost, callback) -> | ||
| process_item cost callback ; | ||
| worker_loop ~bucket ~process_queue ~process_queue_lock ~worker_thread_cond | ||
| ~should_terminate | ||
|
|
||
| let create ~burst_size ~fill_rate = | ||
| let bucket = Token_bucket.create ~burst_size ~fill_rate in | ||
| let process_queue = Queue.create () in | ||
| let process_queue_lock = Mutex.create () in | ||
| let worker_thread_cond = Condition.create () in | ||
| let should_terminate = Atomic.make false in | ||
| let worker_thread = | ||
| Thread.create | ||
| (fun () -> | ||
| worker_loop ~bucket ~process_queue ~process_queue_lock | ||
| ~worker_thread_cond ~should_terminate | ||
| ) | ||
| () | ||
| in | ||
| { | ||
| bucket | ||
| ; process_queue | ||
| ; process_queue_lock | ||
| ; worker_thread_cond | ||
| ; should_terminate | ||
| ; worker_thread | ||
| } | ||
|
|
||
| let delete data = | ||
| if Atomic.compare_and_set data.should_terminate false true then | ||
| Condition.signal data.worker_thread_cond ; | ||
| Thread.join data.worker_thread | ||
|
|
||
| let check_not_terminated should_terminate = | ||
| if Atomic.get should_terminate then | ||
| invalid_arg "Rate_limit: submit called on a deleted rate limiter" | ||
|
|
||
| (* The callback should return quickly - if it is a longer task it is | ||
| responsible for creating a thread to do the task *) | ||
| let submit_async | ||
| { | ||
| bucket | ||
| ; process_queue | ||
| ; process_queue_lock | ||
| ; worker_thread_cond | ||
| ; should_terminate | ||
| ; _ | ||
| } ~callback amount = | ||
| check_not_terminated should_terminate ; | ||
| let run_immediately = | ||
| with_lock process_queue_lock (fun () -> | ||
| let immediate = | ||
| Queue.is_empty process_queue && Token_bucket.consume bucket amount | ||
| in | ||
| if not immediate then ( | ||
| Queue.add (amount, callback) process_queue ; | ||
| Condition.signal worker_thread_cond | ||
| ) ; | ||
| immediate | ||
| ) | ||
| in | ||
| if run_immediately then | ||
| callback () | ||
| else | ||
| D.debug "%s: rate limiting call" __FUNCTION__ | ||
|
|
||
| (* Block and execute on the same thread *) | ||
| let submit_sync bucket_data ~callback amount = | ||
| check_not_terminated bucket_data.should_terminate ; | ||
| let channel_opt = | ||
| with_lock bucket_data.process_queue_lock (fun () -> | ||
| if | ||
| Queue.is_empty bucket_data.process_queue | ||
| && Token_bucket.consume bucket_data.bucket amount | ||
| then | ||
| None | ||
| (* Can run callback immediately after releasing lock *) | ||
| else | ||
| (* Rate limited, need to retrieve function result via channel *) | ||
| let channel = Event.new_channel () in | ||
| Queue.add | ||
| (amount, fun () -> Event.sync (Event.send channel ())) | ||
| bucket_data.process_queue ; | ||
| Condition.signal bucket_data.worker_thread_cond ; | ||
| Some channel | ||
| ) | ||
| in | ||
| match channel_opt with | ||
| | None -> | ||
| callback () | ||
| | Some channel -> | ||
| D.debug "%s: rate limiting call" __FUNCTION__ ; | ||
| Event.sync (Event.receive channel) ; | ||
| callback () | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| (* | ||
| * Copyright (C) 2025 Cloud Software Group | ||
| * | ||
| * This program is free software; you can redistribute it and/or modify | ||
| * it under the terms of the GNU Lesser General Public License as published | ||
| * by the Free Software Foundation; version 2.1 only. with the special | ||
| * exception on linking described in file LICENSE. | ||
| * | ||
| * This program is distributed in the hope that it will be useful, | ||
| * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| * GNU Lesser General Public License for more details. | ||
| *) | ||
|
|
||
| (** Rate limiter built on top of a token bucket. Provides async and sync | ||
| submission of callbacks that are rate-limited. Uses a worker thread | ||
| to process queued callbacks when tokens become available. *) | ||
|
|
||
| type t | ||
|
|
||
| val create : burst_size:float -> fill_rate:float -> t | ||
| (** [create ~burst_size ~fill_rate] creates a new rate limiter with the given | ||
| token bucket parameters. | ||
| @raises Invalid_argument if the parameters are invalid | ||
| (e.g. non-positive fill rate). | ||
| @param burst_size Maximum number of tokens in the bucket | ||
| @param fill_rate Number of tokens added per second *) | ||
|
|
||
| val delete : t -> unit | ||
| (** [delete t] signals the worker thread to terminate. The worker thread | ||
| processes any remaining queued callbacks, then exits. Blocks the caller | ||
| until the worker thread has finished. Subsequent calls to [submit_async] | ||
| or [submit_sync] will raise [Invalid_argument]. *) | ||
|
|
||
| val submit_async : t -> callback:(unit -> unit) -> float -> unit | ||
| (** [submit_async t ~callback amount] submits a callback under rate limiting. | ||
| If tokens are immediately available and no callbacks are queued, the | ||
| callback runs synchronously on the calling thread. Otherwise it is | ||
| enqueued and will be executed by a worker thread when tokens become | ||
| available. Returns immediately. *) | ||
|
|
||
| val submit_sync : t -> callback:(unit -> 'a) -> float -> 'a | ||
| (** [submit_sync t ~callback amount] submits a callback under rate limiting | ||
| and blocks until it completes, returning the callback's result. If tokens | ||
| are immediately available and no callbacks are queued, the callback runs | ||
| directly. Otherwise, the caller blocks until the worker thread signals | ||
| that tokens are available. *) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| (* | ||
| * Copyright (C) 2025 Cloud Software Group | ||
| * | ||
| * This program is free software; you can redistribute it and/or modify | ||
| * it under the terms of the GNU Lesser General Public License as published | ||
| * by the Free Software Foundation; version 2.1 only. with the special | ||
| * exception on linking described in file LICENSE. | ||
| * | ||
| * This program is distributed in the hope that it will be useful, | ||
| * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
| * GNU Lesser General Public License for more details. | ||
| *) | ||
|
|
||
| type state = {tokens: float; last_refill: Mtime.span} | ||
|
|
||
| type t = {burst_size: float; fill_rate: float; state: state Atomic.t} | ||
|
|
||
| let create_with_timestamp timestamp ~burst_size ~fill_rate = | ||
| if fill_rate <= 0. then | ||
| invalid_arg "Token_bucket.create: fill_rate must be positive" | ||
| else | ||
| let state = Atomic.make {tokens= burst_size; last_refill= timestamp} in | ||
| {burst_size; fill_rate; state} | ||
|
|
||
| let create = create_with_timestamp (Mtime_clock.elapsed ()) | ||
|
|
||
| let compute_tokens timestamp {tokens; last_refill} ~burst_size ~fill_rate = | ||
| let time_delta = Mtime.Span.abs_diff last_refill timestamp in | ||
| let time_delta_seconds = Mtime.Span.to_float_ns time_delta *. 1e-9 in | ||
| min burst_size (tokens +. (time_delta_seconds *. fill_rate)) | ||
|
|
||
| let peek_with_timestamp timestamp tb = | ||
| let tb_state = Atomic.get tb.state in | ||
| compute_tokens timestamp tb_state ~burst_size:tb.burst_size | ||
| ~fill_rate:tb.fill_rate | ||
|
|
||
| let peek tb = peek_with_timestamp (Mtime_clock.elapsed ()) tb | ||
|
|
||
| let consume_with_timestamp get_time tb amount = | ||
| let rec try_consume () = | ||
| let timestamp = get_time () in | ||
| let old_state = Atomic.get tb.state in | ||
| let new_tokens = | ||
| compute_tokens timestamp old_state ~burst_size:tb.burst_size | ||
| ~fill_rate:tb.fill_rate | ||
| in | ||
| let success, final_tokens = | ||
| if new_tokens >= amount then | ||
| (true, new_tokens -. amount) | ||
| else | ||
| (false, new_tokens) | ||
| in | ||
| let new_state = {tokens= final_tokens; last_refill= timestamp} in | ||
| if Atomic.compare_and_set tb.state old_state new_state then | ||
| success | ||
| else | ||
| try_consume () | ||
| in | ||
| try_consume () | ||
|
|
||
| let consume = consume_with_timestamp Mtime_clock.elapsed | ||
|
|
||
| let get_delay_until_available_timestamp timestamp tb amount = | ||
| let {tokens; last_refill} = Atomic.get tb.state in | ||
| let current_tokens = | ||
| compute_tokens timestamp {tokens; last_refill} ~burst_size:tb.burst_size | ||
| ~fill_rate:tb.fill_rate | ||
| in | ||
| let required_tokens = max 0. (amount -. current_tokens) in | ||
| required_tokens /. tb.fill_rate | ||
|
|
||
| let get_delay_until_available tb amount = | ||
| get_delay_until_available_timestamp (Mtime_clock.elapsed ()) tb amount | ||
|
|
||
| (* This implementation only works when there is only one thread trying to | ||
| consume - fairness needs to be implemented on top of it with a queue. | ||
| If there is no contention, it should only delay once. *) | ||
| let rec delay_then_consume tb amount = | ||
| if not (consume tb amount) then ( | ||
| Thread.delay (get_delay_until_available tb amount) ; | ||
| delay_then_consume tb amount | ||
| ) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an easy way to measure the duration an execution took and to log it? This way we could spot executions that take too long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a Xapi_stats module that could be used. Longer term the opentelemetry metrics feature could be used to expose more counters.
Meanwhile we already have opentelemetry spans that contain high-resolution begin/end timestamps, so when distributed tracing is enabled, this information can be extracted from there.
I think it'd be useful to create an opentelemetry span whenever rate-limiting kicks in, to make this more visible though. Perhaps @GabrielBuica could help with picking a good place to add distributed tracing spans in the rate limiting code.