@@ -22,7 +22,9 @@ use crate::devices::virtio::pmem::metrics::{PmemMetrics, PmemMetricsPerDevice};
2222use crate::devices::virtio::queue::{DescriptorChain, InvalidAvailIdx, Queue, QueueError};
2323use crate::devices::virtio::transport::{VirtioInterrupt, VirtioInterruptType};
2424use crate::logger::{IncMetric, error, info};
25+ use crate::rate_limiter::{BucketUpdate, RateLimiter, TokenType};
2526use crate::utils::{align_up, u64_to_usize};
27+ use crate::vmm_config::RateLimiterConfig;
2628use crate::vmm_config::pmem::PmemConfig;
2729use crate::vstate::memory::{ByteValued, Bytes, GuestMemoryMmap, GuestMmapRegion};
2830use crate::vstate::vm::VmError;
@@ -58,6 +60,8 @@ pub enum PmemError {
5860 Queue(#[from] QueueError),
5961 /// Error during obtaining the descriptor from the queue: {0}
6062 QueuePop(#[from] InvalidAvailIdx),
63+ /// Error creating rate limiter: {0}
64+ RateLimiter(std::io::Error),
6165}
6266
6367const VIRTIO_PMEM_REQ_TYPE_FLUSH: u32 = 0;
@@ -94,6 +98,7 @@ pub struct Pmem {
9498 pub file_len: u64,
9599 pub mmap_ptr: u64,
96100 pub metrics: Arc<PmemMetrics>,
101+ pub rate_limiter: RateLimiter,
97102
98103 pub config: PmemConfig,
99104}
@@ -125,6 +130,13 @@ impl Pmem {
125130 let (file, file_len, mmap_ptr, mmap_len) =
126131 Self::mmap_backing_file(&config.path_on_host, config.read_only)?;
127132
133+ let rate_limiter = config
134+ .rate_limiter
135+ .map(RateLimiterConfig::try_into)
136+ .transpose()
137+ .map_err(PmemError::RateLimiter)?
138+ .unwrap_or_default();
139+
128140 Ok(Self {
129141 avail_features: 1u64 << VIRTIO_F_VERSION_1,
130142 acked_features: 0u64,
@@ -140,6 +152,7 @@ impl Pmem {
140152 file_len,
141153 mmap_ptr,
142154 metrics: PmemMetricsPerDevice::alloc(config.id.clone()),
155+ rate_limiter,
143156 config,
144157 })
145158 }
@@ -254,6 +267,26 @@ impl Pmem {
254267 // This is safe since we checked in the event handler that the device is activated.
255268 let active_state = self.device_state.active_state().unwrap();
256269
270+ if self.queues[0].is_empty() {
271+ return Ok(());
272+ }
273+
274+ // There is only 1 type of request pmem supports, so we can consume
275+ // rate-limiter before even looking at the queue. This is still valid
276+ // even if the queue will not have any valid requests since it indicate
277+ // broken guest driver and rate-limiting should still apply for such case.
278+ // Rate limit: consume 1 op and file_len bytes for the coalesced msync.
279+ // If the rate limiter is blocked, defer notification until the timer fires.
280+ if !self.rate_limiter.consume(1, TokenType::Ops) {
281+ self.metrics.rate_limiter_throttled_events.inc();
282+ return Ok(());
283+ }
284+ if !self.rate_limiter.consume(self.file_len, TokenType::Bytes) {
285+ self.rate_limiter.manual_replenish(1, TokenType::Ops);
286+ self.metrics.rate_limiter_throttled_events.inc();
287+ return Ok(());
288+ }
289+
257290 let mut cached_result = None;
258291 while let Some(head) = self.queues[0].pop()? {
259292 let add_result = match self.process_chain(head, &mut cached_result) {
@@ -270,8 +303,8 @@ impl Pmem {
270303 break;
271304 }
272305 }
273- self.queues[0].advance_used_ring_idx();
274306
307+ self.queues[0].advance_used_ring_idx();
275308 if self.queues[0].prepare_kick() {
276309 active_state
277310 .interrupt
@@ -347,6 +380,11 @@ impl Pmem {
347380 Ok(())
348381 }
349382
383+ /// Updates the parameters for the rate limiter.
384+ pub fn update_rate_limiter(&mut self, bytes: BucketUpdate, ops: BucketUpdate) {
385+ self.rate_limiter.update_buckets(bytes, ops);
386+ }
387+
350388 pub fn process_queue(&mut self) {
351389 self.metrics.queue_event_count.inc();
352390 if let Err(err) = self.queue_events[0].read() {
@@ -355,6 +393,25 @@ impl Pmem {
355393 return;
356394 }
357395
396+ if self.rate_limiter.is_blocked() {
397+ self.metrics.rate_limiter_throttled_events.inc();
398+ return;
399+ }
400+
401+ self.handle_queue().unwrap_or_else(|err| {
402+ error!("pmem: {err:?}");
403+ self.metrics.event_fails.inc();
404+ });
405+ }
406+
407+ pub fn process_rate_limiter_event(&mut self) {
408+ self.metrics.rate_limiter_event_count.inc();
409+ if let Err(err) = self.rate_limiter.event_handler() {
410+ error!("pmem: Failed to get rate-limiter event: {err:?}");
411+ self.metrics.event_fails.inc();
412+ return;
413+ }
414+
358415 self.handle_queue().unwrap_or_else(|err| {
359416 error!("pmem: {err:?}");
360417 self.metrics.event_fails.inc();
@@ -458,6 +515,7 @@ mod tests {
458515 path_on_host: "not_a_path".into(),
459516 root_device: true,
460517 read_only: false,
518+ ..Default::default()
461519 };
462520 assert!(matches!(
463521 Pmem::new(config).unwrap_err(),
@@ -471,6 +529,7 @@ mod tests {
471529 path_on_host: dummy_path.clone(),
472530 root_device: true,
473531 read_only: false,
532+ ..Default::default()
474533 };
475534 assert!(matches!(
476535 Pmem::new(config).unwrap_err(),
@@ -483,6 +542,7 @@ mod tests {
483542 path_on_host: dummy_path,
484543 root_device: true,
485544 read_only: false,
545+ ..Default::default()
486546 };
487547 Pmem::new(config).unwrap();
488548 }
@@ -497,6 +557,7 @@ mod tests {
497557 path_on_host: dummy_path,
498558 root_device: true,
499559 read_only: false,
560+ ..Default::default()
500561 };
501562 let mut pmem = Pmem::new(config).unwrap();
502563
0 commit comments