Skip to content

Commit 05a34ac

Browse files
Lethe10137BobAnkh
authored andcommitted
feat(cell): wait for notify before start
1 parent ebd67e7 commit 05a34ac

10 files changed

Lines changed: 71 additions & 187 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ tracing = "0.1.37"
4848
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
4949

5050
[features]
51+
default = ["first-packet"]
52+
first-packet = ["rattan-core/first-packet"]
5153
http = ["rattan-core/http"]
5254
packet-dump = ["rattan-core/packet-dump"]
5355

rattan-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ harness = false
7474

7575
[features]
7676
default = []
77+
first-packet = []
7778
http = ["serde", "dep:axum"]
7879
packet-dump = []
7980
serde = ["dep:serde", "dep:serde_json", "bandwidth/serde", "bytesize/serde", "rand_distr/serde"]

rattan-core/src/cells/bandwidth/mod.rs

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
use super::TRACE_START_INSTANT;
12
use crate::cells::bandwidth::queue::PacketQueue;
23
use crate::cells::{Cell, Packet};
3-
use crate::core::CALIBRATED_START_INSTANT;
44
use crate::error::Error;
55
use crate::metal::timer::Timer;
66
use async_trait::async_trait;
@@ -148,23 +148,7 @@ where
148148
{
149149
async fn dequeue(&mut self) -> Option<P> {
150150
// Wait for Start notify if not started yet
151-
if !self.started {
152-
if let Some(notify_rx) = &mut self.notify_rx {
153-
match notify_rx.recv().await {
154-
Ok(crate::control::RattanNotify::Start) => {
155-
self.change_state(2);
156-
self.started = true;
157-
}
158-
Ok(crate::control::RattanNotify::FirstPacket) => {
159-
// Continue waiting for Start notify
160-
}
161-
Err(_) => {
162-
// Notify channel closed, exit
163-
return None;
164-
}
165-
}
166-
}
167-
}
151+
crate::wait_until_started!(self, Start);
168152

169153
// wait until next_available
170154
loop {
@@ -517,25 +501,11 @@ where
517501
Q: PacketQueue<P>,
518502
{
519503
async fn dequeue(&mut self) -> Option<P> {
520-
// Wait for Start notify if not started yet
521-
if !self.started {
522-
if let Some(notify_rx) = &mut self.notify_rx {
523-
match notify_rx.recv().await {
524-
Ok(crate::control::RattanNotify::Start) => {
525-
self.reset();
526-
self.change_state(2);
527-
self.started = true;
528-
}
529-
Ok(crate::control::RattanNotify::FirstPacket) => {
530-
// Continue waiting for Start notify
531-
}
532-
Err(_) => {
533-
// Notify channel closed, exit
534-
return None;
535-
}
536-
}
537-
}
538-
}
504+
// Wait for FirstPacket notify if not started yet
505+
#[cfg(feature = "first-packet")]
506+
crate::wait_until_started!(self, FirstPacket);
507+
#[cfg(not(feature = "first-packet"))]
508+
crate::wait_until_started!(self, Start);
539509

540510
// wait until next_available
541511
loop {
@@ -551,7 +521,7 @@ where
551521
return None;
552522
}
553523
}
554-
recv_packet = self.egress.recv() => {
524+
recv_packet = self.egress.recv() => {
555525
match recv_packet {
556526
Some(new_packet) => {
557527
match self.state.load(std::sync::atomic::Ordering::Acquire) {
@@ -633,8 +603,8 @@ where
633603

634604
// This must be called before any dequeue
635605
fn reset(&mut self) {
636-
self.next_available = *CALIBRATED_START_INSTANT.get_or_init(Instant::now);
637-
self.next_change = *CALIBRATED_START_INSTANT.get_or_init(Instant::now);
606+
self.next_available = *TRACE_START_INSTANT.get_or_init(Instant::now);
607+
self.next_change = *TRACE_START_INSTANT.get_or_init(Instant::now);
638608
}
639609

640610
fn change_state(&self, state: i32) {

rattan-core/src/cells/delay.rs

Lines changed: 9 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
use super::TRACE_START_INSTANT;
12
use crate::cells::{Cell, Packet};
2-
use crate::core::CALIBRATED_START_INSTANT;
33
use crate::error::Error;
44
use crate::metal::timer::Timer;
55
use async_trait::async_trait;
@@ -81,24 +81,7 @@ where
8181
{
8282
async fn dequeue(&mut self) -> Option<P> {
8383
// Wait for Start notify if not started yet
84-
if !self.started {
85-
if let Some(notify_rx) = &mut self.notify_rx {
86-
match notify_rx.recv().await {
87-
Ok(crate::control::RattanNotify::Start) => {
88-
self.reset();
89-
self.change_state(2);
90-
self.started = true;
91-
}
92-
Ok(crate::control::RattanNotify::FirstPacket) => {
93-
// Continue waiting for Start notify
94-
}
95-
Err(_) => {
96-
// Notify channel closed, exit
97-
return None;
98-
}
99-
}
100-
}
101-
}
84+
crate::wait_until_started!(self, Start);
10285

10386
let packet = match self.egress.recv().await {
10487
Some(packet) => packet,
@@ -307,25 +290,11 @@ where
307290
P: Packet + Send + Sync,
308291
{
309292
async fn dequeue(&mut self) -> Option<P> {
310-
// Wait for Start notify if not started yet
311-
if !self.started {
312-
if let Some(notify_rx) = &mut self.notify_rx {
313-
match notify_rx.recv().await {
314-
Ok(crate::control::RattanNotify::Start) => {
315-
self.reset();
316-
self.change_state(2);
317-
self.started = true;
318-
}
319-
Ok(crate::control::RattanNotify::FirstPacket) => {
320-
// Continue waiting for Start notify
321-
}
322-
Err(_) => {
323-
// Notify channel closed, exit
324-
return None;
325-
}
326-
}
327-
}
328-
}
293+
// Wait for FirstPacket notify if not started yet
294+
#[cfg(feature = "first-packet")]
295+
crate::wait_until_started!(self, FirstPacket);
296+
#[cfg(not(feature = "first-packet"))]
297+
crate::wait_until_started!(self, Start);
329298

330299
let packet = loop {
331300
tokio::select! {
@@ -376,8 +345,8 @@ where
376345

377346
// This must be called before any dequeue
378347
fn reset(&mut self) {
379-
self.next_available = *CALIBRATED_START_INSTANT.get_or_init(Instant::now);
380-
self.next_change = *CALIBRATED_START_INSTANT.get_or_init(Instant::now);
348+
self.next_available = *TRACE_START_INSTANT.get_or_init(Instant::now);
349+
self.next_change = *TRACE_START_INSTANT.get_or_init(Instant::now);
381350
}
382351

383352
fn change_state(&self, state: i32) {

rattan-core/src/cells/loss.rs

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
use super::TRACE_START_INSTANT;
12
use crate::cells::{Cell, Packet};
2-
use crate::core::CALIBRATED_START_INSTANT;
33
use crate::error::Error;
44
use crate::metal::timer::Timer;
55
use crate::utils::sync::AtomicRawCell;
@@ -73,23 +73,7 @@ where
7373
{
7474
async fn dequeue(&mut self) -> Option<P> {
7575
// Wait for Start notify if not started yet
76-
if !self.started {
77-
if let Some(notify_rx) = &mut self.notify_rx {
78-
match notify_rx.recv().await {
79-
Ok(crate::control::RattanNotify::Start) => {
80-
self.change_state(2);
81-
self.started = true;
82-
}
83-
Ok(crate::control::RattanNotify::FirstPacket) => {
84-
// Continue waiting for Start notify
85-
}
86-
Err(_) => {
87-
// Notify channel closed, exit
88-
return None;
89-
}
90-
}
91-
}
92-
}
76+
crate::wait_until_started!(self, Start);
9377

9478
let packet = match self.egress.recv().await {
9579
Some(packet) => packet,
@@ -299,25 +283,11 @@ where
299283
R: Rng + Send + Sync,
300284
{
301285
async fn dequeue(&mut self) -> Option<P> {
302-
// Wait for Start notify if not started yet
303-
if !self.started {
304-
if let Some(notify_rx) = &mut self.notify_rx {
305-
match notify_rx.recv().await {
306-
Ok(crate::control::RattanNotify::Start) => {
307-
self.reset();
308-
self.change_state(2);
309-
self.started = true;
310-
}
311-
Ok(crate::control::RattanNotify::FirstPacket) => {
312-
// Continue waiting for Start notify
313-
}
314-
Err(_) => {
315-
// Notify channel closed, exit
316-
return None;
317-
}
318-
}
319-
}
320-
}
286+
// Wait for FirstPacket notify if not started yet
287+
#[cfg(feature = "first-packet")]
288+
crate::wait_until_started!(self, FirstPacket);
289+
#[cfg(not(feature = "first-packet"))]
290+
crate::wait_until_started!(self, Start);
321291

322292
let packet = loop {
323293
tokio::select! {
@@ -360,7 +330,7 @@ where
360330

361331
fn reset(&mut self) {
362332
self.prev_loss = 0;
363-
self.next_change = *CALIBRATED_START_INSTANT.get_or_init(Instant::now);
333+
self.next_change = *TRACE_START_INSTANT.get_or_init(Instant::now);
364334
}
365335

366336
fn change_state(&self, state: i32) {

rattan-core/src/cells/mod.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,3 +325,40 @@ where
325325
fn into_receiver(self) -> Self::EgressType;
326326
fn control_interface(&self) -> Arc<Self::ControlInterfaceType>;
327327
}
328+
329+
/// Called at the start of the cell's dequeue() method. Make sure that no packets shall be dequeued
330+
/// until an expected Notification has been received ONCE.
331+
#[macro_export]
332+
macro_rules! wait_until_started {
333+
($self:ident, $variant:ident) => {
334+
while !$self.started {
335+
if let Some(notify_rx) = &mut $self.notify_rx {
336+
match notify_rx.recv().await {
337+
Ok($crate::control::RattanNotify::$variant) => {
338+
$self.reset();
339+
$self.change_state(2);
340+
$self.started = true;
341+
}
342+
Ok(_) => {
343+
// Ignore unexpected notifications.
344+
continue;
345+
}
346+
Err(_) => {
347+
// This happens when the notifier is dropped.
348+
return None;
349+
}
350+
}
351+
} else {
352+
// The notifier is not set unless the normal startup of Rattan has taken place. In some
353+
// non-integrated environments, the notifier may not be set, like unit tests for cells.
354+
break;
355+
}
356+
}
357+
};
358+
}
359+
360+
/// Cells that replay a trace should refer to TRACE_START_INSTANT as the logical start instant of the trace.
361+
#[cfg(not(feature = "first-packet"))]
362+
pub use crate::core::CALIBRATED_START_INSTANT as TRACE_START_INSTANT;
363+
#[cfg(feature = "first-packet")]
364+
pub use crate::core::FIRST_PACKET_INSTANT as TRACE_START_INSTANT;

rattan-core/src/cells/per_packet/delay.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,23 +75,7 @@ where
7575
{
7676
async fn dequeue(&mut self) -> Option<P> {
7777
// Wait for Start notify if not started yet
78-
if !self.started {
79-
if let Some(notify_rx) = &mut self.notify_rx {
80-
match notify_rx.recv().await {
81-
Ok(crate::control::RattanNotify::Start) => {
82-
self.change_state(2);
83-
self.started = true;
84-
}
85-
Ok(crate::control::RattanNotify::FirstPacket) => {
86-
// Continue waiting for Start notify
87-
}
88-
Err(_) => {
89-
// Notify channel closed, exit
90-
return None;
91-
}
92-
}
93-
}
94-
}
78+
crate::wait_until_started!(self, Start);
9579

9680
// wait until next_available
9781
loop {

rattan-core/src/cells/shadow.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,7 @@ where
4444
{
4545
async fn dequeue(&mut self) -> Option<P> {
4646
// Wait for Start notify if not started yet
47-
if !self.started {
48-
if let Some(notify_rx) = &mut self.notify_rx {
49-
match notify_rx.recv().await {
50-
Ok(crate::control::RattanNotify::Start) => {
51-
self.change_state(2);
52-
self.started = true;
53-
}
54-
Ok(crate::control::RattanNotify::FirstPacket) => {
55-
// Continue waiting for Start notify
56-
}
57-
Err(_) => {
58-
// Notify channel closed, exit
59-
return None;
60-
}
61-
}
62-
}
63-
}
47+
crate::wait_until_started!(self, Start);
6448

6549
match self.state.load(std::sync::atomic::Ordering::Acquire) {
6650
0 => None,

rattan-core/src/cells/spy.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -123,23 +123,7 @@ where
123123
{
124124
async fn dequeue(&mut self) -> Option<P> {
125125
// Wait for Start notify if not started yet
126-
if !self.started {
127-
if let Some(notify_rx) = &mut self.notify_rx {
128-
match notify_rx.recv().await {
129-
Ok(crate::control::RattanNotify::Start) => {
130-
self.change_state(2);
131-
self.started = true;
132-
}
133-
Ok(crate::control::RattanNotify::FirstPacket) => {
134-
// Continue waiting for Start notify
135-
}
136-
Err(_) => {
137-
// Notify channel closed, exit
138-
return None;
139-
}
140-
}
141-
}
142-
}
126+
crate::wait_until_started!(self, Start);
143127

144128
loop {
145129
tokio::select! {

rattan-core/src/cells/token_bucket.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -201,24 +201,7 @@ where
201201
{
202202
async fn dequeue(&mut self) -> Option<P> {
203203
// Wait for Start notify if not started yet
204-
if !self.started {
205-
if let Some(notify_rx) = &mut self.notify_rx {
206-
match notify_rx.recv().await {
207-
Ok(crate::control::RattanNotify::Start) => {
208-
self.reset();
209-
self.change_state(2);
210-
self.started = true;
211-
}
212-
Ok(crate::control::RattanNotify::FirstPacket) => {
213-
// Continue waiting for Start notify
214-
}
215-
Err(_) => {
216-
// Notify channel closed, exit
217-
return None;
218-
}
219-
}
220-
}
221-
}
204+
crate::wait_until_started!(self, Start);
222205

223206
// wait until next_available
224207
loop {

0 commit comments

Comments
 (0)