@@ -22,7 +22,9 @@ use crate::devices::virtio::pmem::metrics::{PmemMetrics, PmemMetricsPerDevice};
2222use crate :: devices:: virtio:: queue:: { DescriptorChain , InvalidAvailIdx , Queue , QueueError } ;
2323use crate :: devices:: virtio:: transport:: { VirtioInterrupt , VirtioInterruptType } ;
2424use crate :: logger:: { IncMetric , error, info} ;
25+ use crate :: rate_limiter:: { BucketUpdate , RateLimiter , TokenType } ;
2526use crate :: utils:: { align_up, u64_to_usize} ;
27+ use crate :: vmm_config:: RateLimiterConfig ;
2628use crate :: vmm_config:: pmem:: PmemConfig ;
2729use crate :: vstate:: memory:: { ByteValued , Bytes , GuestMemoryMmap , GuestMmapRegion } ;
2830use crate :: vstate:: vm:: VmError ;
@@ -58,6 +60,8 @@ pub enum PmemError {
5860 Queue ( #[ from] QueueError ) ,
5961 /// Error during obtaining the descriptor from the queue: {0}
6062 QueuePop ( #[ from] InvalidAvailIdx ) ,
63+ /// Error creating rate limiter: {0}
64+ RateLimiter ( std:: io:: Error ) ,
6165}
6266
6367const VIRTIO_PMEM_REQ_TYPE_FLUSH : u32 = 0 ;
@@ -94,6 +98,7 @@ pub struct Pmem {
9498 pub file_len : u64 ,
9599 pub mmap_ptr : u64 ,
96100 pub metrics : Arc < PmemMetrics > ,
101+ pub rate_limiter : RateLimiter ,
97102
98103 pub config : PmemConfig ,
99104}
@@ -125,6 +130,13 @@ impl Pmem {
125130 let ( file, file_len, mmap_ptr, mmap_len) =
126131 Self :: mmap_backing_file ( & config. path_on_host , config. read_only ) ?;
127132
133+ let rate_limiter = config
134+ . rate_limiter
135+ . map ( RateLimiterConfig :: try_into)
136+ . transpose ( )
137+ . map_err ( PmemError :: RateLimiter ) ?
138+ . unwrap_or_default ( ) ;
139+
128140 Ok ( Self {
129141 avail_features : 1u64 << VIRTIO_F_VERSION_1 ,
130142 acked_features : 0u64 ,
@@ -140,6 +152,7 @@ impl Pmem {
140152 file_len,
141153 mmap_ptr,
142154 metrics : PmemMetricsPerDevice :: alloc ( config. id . clone ( ) ) ,
155+ rate_limiter,
143156 config,
144157 } )
145158 }
@@ -254,6 +267,26 @@ impl Pmem {
254267 // This is safe since we checked in the event handler that the device is activated.
255268 let active_state = self . device_state . active_state ( ) . unwrap ( ) ;
256269
270+ if self . queues [ 0 ] . is_empty ( ) {
271+ return Ok ( ( ) ) ;
272+ }
273+
274+ // There is only 1 type of request pmem supports, so we can consume
275+ // rate-limiter before even looking at the queue. This is still valid
276+ // even if the queue will not have any valid requests since it indicate
277+ // broken guest driver and rate-limiting should still apply for such case.
278+ // Rate limit: consume 1 op and file_len bytes for the coalesced msync.
279+ // If the rate limiter is blocked, defer notification until the timer fires.
280+ if !self . rate_limiter . consume ( 1 , TokenType :: Ops ) {
281+ self . metrics . rate_limiter_throttled_events . inc ( ) ;
282+ return Ok ( ( ) ) ;
283+ }
284+ if !self . rate_limiter . consume ( self . file_len , TokenType :: Bytes ) {
285+ self . rate_limiter . manual_replenish ( 1 , TokenType :: Ops ) ;
286+ self . metrics . rate_limiter_throttled_events . inc ( ) ;
287+ return Ok ( ( ) ) ;
288+ }
289+
257290 let mut cached_result = None ;
258291 while let Some ( head) = self . queues [ 0 ] . pop ( ) ? {
259292 let add_result = match self . process_chain ( head, & mut cached_result) {
@@ -270,8 +303,8 @@ impl Pmem {
270303 break ;
271304 }
272305 }
273- self . queues [ 0 ] . advance_used_ring_idx ( ) ;
274306
307+ self . queues [ 0 ] . advance_used_ring_idx ( ) ;
275308 if self . queues [ 0 ] . prepare_kick ( ) {
276309 active_state
277310 . interrupt
@@ -347,6 +380,11 @@ impl Pmem {
347380 Ok ( ( ) )
348381 }
349382
383+ /// Updates the parameters for the rate limiter.
384+ pub fn update_rate_limiter ( & mut self , bytes : BucketUpdate , ops : BucketUpdate ) {
385+ self . rate_limiter . update_buckets ( bytes, ops) ;
386+ }
387+
350388 pub fn process_queue ( & mut self ) {
351389 self . metrics . queue_event_count . inc ( ) ;
352390 if let Err ( err) = self . queue_events [ 0 ] . read ( ) {
@@ -355,6 +393,25 @@ impl Pmem {
355393 return ;
356394 }
357395
396+ if self . rate_limiter . is_blocked ( ) {
397+ self . metrics . rate_limiter_throttled_events . inc ( ) ;
398+ return ;
399+ }
400+
401+ self . handle_queue ( ) . unwrap_or_else ( |err| {
402+ error ! ( "pmem: {err:?}" ) ;
403+ self . metrics . event_fails . inc ( ) ;
404+ } ) ;
405+ }
406+
407+ pub fn process_rate_limiter_event ( & mut self ) {
408+ self . metrics . rate_limiter_event_count . inc ( ) ;
409+ if let Err ( err) = self . rate_limiter . event_handler ( ) {
410+ error ! ( "pmem: Failed to get rate-limiter event: {err:?}" ) ;
411+ self . metrics . event_fails . inc ( ) ;
412+ return ;
413+ }
414+
358415 self . handle_queue ( ) . unwrap_or_else ( |err| {
359416 error ! ( "pmem: {err:?}" ) ;
360417 self . metrics . event_fails . inc ( ) ;
@@ -458,6 +515,7 @@ mod tests {
458515 path_on_host : "not_a_path" . into ( ) ,
459516 root_device : true ,
460517 read_only : false ,
518+ ..Default :: default ( )
461519 } ;
462520 assert ! ( matches!(
463521 Pmem :: new( config) . unwrap_err( ) ,
@@ -471,6 +529,7 @@ mod tests {
471529 path_on_host : dummy_path. clone ( ) ,
472530 root_device : true ,
473531 read_only : false ,
532+ ..Default :: default ( )
474533 } ;
475534 assert ! ( matches!(
476535 Pmem :: new( config) . unwrap_err( ) ,
@@ -483,6 +542,7 @@ mod tests {
483542 path_on_host : dummy_path,
484543 root_device : true ,
485544 read_only : false ,
545+ ..Default :: default ( )
486546 } ;
487547 Pmem :: new ( config) . unwrap ( ) ;
488548 }
@@ -497,6 +557,7 @@ mod tests {
497557 path_on_host : dummy_path,
498558 root_device : true ,
499559 read_only : false ,
560+ ..Default :: default ( )
500561 } ;
501562 let mut pmem = Pmem :: new ( config) . unwrap ( ) ;
502563
0 commit comments