Skip to content

Commit ebd67e7

Browse files
committed
feat(cell): add notify channel to cell
1 parent 4af1a79 commit ebd67e7

10 files changed

Lines changed: 330 additions & 24 deletions

File tree

rattan-core/src/cells/bandwidth/mod.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ where
9696
config_rx: mpsc::UnboundedReceiver<BwCellConfig<P, Q>>,
9797
timer: Timer,
9898
state: AtomicI32,
99+
notify_rx: Option<tokio::sync::broadcast::Receiver<crate::control::RattanNotify>>,
100+
started: bool,
99101
}
100102

101103
impl<P, Q> BwCellEgress<P, Q>
@@ -145,6 +147,25 @@ where
145147
Q: PacketQueue<P>,
146148
{
147149
async fn dequeue(&mut self) -> Option<P> {
150+
// Wait for Start notify if not started yet
151+
if !self.started {
152+
if let Some(notify_rx) = &mut self.notify_rx {
153+
match notify_rx.recv().await {
154+
Ok(crate::control::RattanNotify::Start) => {
155+
self.change_state(2);
156+
self.started = true;
157+
}
158+
Ok(crate::control::RattanNotify::FirstPacket) => {
159+
// Continue waiting for Start notify
160+
}
161+
Err(_) => {
162+
// Notify channel closed, exit
163+
return None;
164+
}
165+
}
166+
}
167+
}
168+
148169
// wait until next_available
149170
loop {
150171
tokio::select! {
@@ -229,6 +250,13 @@ where
229250
self.state
230251
.store(state, std::sync::atomic::Ordering::Release);
231252
}
253+
254+
fn set_notify_receiver(
255+
&mut self,
256+
notify_rx: tokio::sync::broadcast::Receiver<crate::control::RattanNotify>,
257+
) {
258+
self.notify_rx = Some(notify_rx);
259+
}
232260
}
233261

234262
#[cfg_attr(
@@ -380,6 +408,8 @@ where
380408
config_rx,
381409
timer: Timer::new()?,
382410
state: AtomicI32::new(0),
411+
notify_rx: None,
412+
started: false,
383413
},
384414
control_interface: Arc::new(BwCellControlInterface { config_tx }),
385415
})
@@ -404,6 +434,8 @@ where
404434
send_timer: Timer,
405435
change_timer: Timer,
406436
state: AtomicI32,
437+
notify_rx: Option<tokio::sync::broadcast::Receiver<crate::control::RattanNotify>>,
438+
started: bool,
407439
}
408440

409441
impl<P, Q> BwReplayCellEgress<P, Q>
@@ -485,6 +517,26 @@ where
485517
Q: PacketQueue<P>,
486518
{
487519
async fn dequeue(&mut self) -> Option<P> {
520+
// Wait for Start notify if not started yet
521+
if !self.started {
522+
if let Some(notify_rx) = &mut self.notify_rx {
523+
match notify_rx.recv().await {
524+
Ok(crate::control::RattanNotify::Start) => {
525+
self.reset();
526+
self.change_state(2);
527+
self.started = true;
528+
}
529+
Ok(crate::control::RattanNotify::FirstPacket) => {
530+
// Continue waiting for Start notify
531+
}
532+
Err(_) => {
533+
// Notify channel closed, exit
534+
return None;
535+
}
536+
}
537+
}
538+
}
539+
488540
// wait until next_available
489541
loop {
490542
tokio::select! {
@@ -589,6 +641,13 @@ where
589641
self.state
590642
.store(state, std::sync::atomic::Ordering::Release);
591643
}
644+
645+
fn set_notify_receiver(
646+
&mut self,
647+
notify_rx: tokio::sync::broadcast::Receiver<crate::control::RattanNotify>,
648+
) {
649+
self.notify_rx = Some(notify_rx);
650+
}
592651
}
593652

594653
#[cfg_attr(
@@ -748,6 +807,8 @@ where
748807
send_timer: Timer::new()?,
749808
change_timer: Timer::new()?,
750809
state: AtomicI32::new(0),
810+
notify_rx: None,
811+
started: false,
751812
},
752813
control_interface: Arc::new(BwReplayCellControlInterface { config_tx }),
753814
})

rattan-core/src/cells/delay.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ where
5656
config_rx: mpsc::UnboundedReceiver<DelayCellConfig>,
5757
timer: Timer,
5858
state: AtomicI32,
59+
notify_rx: Option<tokio::sync::broadcast::Receiver<crate::control::RattanNotify>>,
60+
started: bool,
5961
}
6062

6163
impl<P> DelayCellEgress<P>
@@ -78,6 +80,26 @@ where
7880
P: Packet + Send + Sync,
7981
{
8082
async fn dequeue(&mut self) -> Option<P> {
83+
// Wait for Start notify if not started yet
84+
if !self.started {
85+
if let Some(notify_rx) = &mut self.notify_rx {
86+
match notify_rx.recv().await {
87+
Ok(crate::control::RattanNotify::Start) => {
88+
self.reset();
89+
self.change_state(2);
90+
self.started = true;
91+
}
92+
Ok(crate::control::RattanNotify::FirstPacket) => {
93+
// Continue waiting for Start notify
94+
}
95+
Err(_) => {
96+
// Notify channel closed, exit
97+
return None;
98+
}
99+
}
100+
}
101+
}
102+
81103
let packet = match self.egress.recv().await {
82104
Some(packet) => packet,
83105
None => return None,
@@ -109,6 +131,13 @@ where
109131
self.state
110132
.store(state, std::sync::atomic::Ordering::Release);
111133
}
134+
135+
fn set_notify_receiver(
136+
&mut self,
137+
notify_rx: tokio::sync::broadcast::Receiver<crate::control::RattanNotify>,
138+
) {
139+
self.notify_rx = Some(notify_rx);
140+
}
112141
}
113142

114143
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@@ -190,6 +219,8 @@ where
190219
config_rx,
191220
timer: Timer::new()?,
192221
state: AtomicI32::new(0),
222+
notify_rx: None,
223+
started: false,
193224
},
194225
control_interface: Arc::new(DelayCellControlInterface { config_tx }),
195226
})
@@ -211,6 +242,8 @@ where
211242
send_timer: Timer,
212243
change_timer: Timer,
213244
state: AtomicI32,
245+
notify_rx: Option<tokio::sync::broadcast::Receiver<crate::control::RattanNotify>>,
246+
started: bool,
214247
}
215248

216249
impl<P> DelayReplayCellEgress<P>
@@ -274,6 +307,26 @@ where
274307
P: Packet + Send + Sync,
275308
{
276309
async fn dequeue(&mut self) -> Option<P> {
310+
// Wait for Start notify if not started yet
311+
if !self.started {
312+
if let Some(notify_rx) = &mut self.notify_rx {
313+
match notify_rx.recv().await {
314+
Ok(crate::control::RattanNotify::Start) => {
315+
self.reset();
316+
self.change_state(2);
317+
self.started = true;
318+
}
319+
Ok(crate::control::RattanNotify::FirstPacket) => {
320+
// Continue waiting for Start notify
321+
}
322+
Err(_) => {
323+
// Notify channel closed, exit
324+
return None;
325+
}
326+
}
327+
}
328+
}
329+
277330
let packet = loop {
278331
tokio::select! {
279332
biased;
@@ -331,6 +384,13 @@ where
331384
self.state
332385
.store(state, std::sync::atomic::Ordering::Release);
333386
}
387+
388+
fn set_notify_receiver(
389+
&mut self,
390+
notify_rx: tokio::sync::broadcast::Receiver<crate::control::RattanNotify>,
391+
) {
392+
self.notify_rx = Some(notify_rx);
393+
}
334394
}
335395

336396
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@@ -421,6 +481,8 @@ where
421481
send_timer: Timer::new()?,
422482
change_timer: Timer::new()?,
423483
state: AtomicI32::new(0),
484+
notify_rx: None,
485+
started: false,
424486
},
425487
control_interface: Arc::new(DelayReplayCellControlInterface { config_tx }),
426488
})

rattan-core/src/cells/loss.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ where
6161
prev_loss: usize,
6262
rng: R,
6363
state: AtomicI32,
64+
notify_rx: Option<tokio::sync::broadcast::Receiver<crate::control::RattanNotify>>,
65+
started: bool,
6466
}
6567

6668
#[async_trait]
@@ -70,6 +72,25 @@ where
7072
R: Rng + Send + Sync,
7173
{
7274
async fn dequeue(&mut self) -> Option<P> {
75+
// Wait for Start notify if not started yet
76+
if !self.started {
77+
if let Some(notify_rx) = &mut self.notify_rx {
78+
match notify_rx.recv().await {
79+
Ok(crate::control::RattanNotify::Start) => {
80+
self.change_state(2);
81+
self.started = true;
82+
}
83+
Ok(crate::control::RattanNotify::FirstPacket) => {
84+
// Continue waiting for Start notify
85+
}
86+
Err(_) => {
87+
// Notify channel closed, exit
88+
return None;
89+
}
90+
}
91+
}
92+
}
93+
7394
let packet = match self.egress.recv().await {
7495
Some(packet) => packet,
7596
None => return None,
@@ -105,6 +126,13 @@ where
105126
self.state
106127
.store(state, std::sync::atomic::Ordering::Release);
107128
}
129+
130+
fn set_notify_receiver(
131+
&mut self,
132+
notify_rx: tokio::sync::broadcast::Receiver<crate::control::RattanNotify>,
133+
) {
134+
self.notify_rx = Some(notify_rx);
135+
}
108136
}
109137

110138
// Loss pattern will repeat the last value until stop dropping packets.
@@ -191,6 +219,8 @@ where
191219
prev_loss: 0,
192220
rng,
193221
state: AtomicI32::new(0),
222+
notify_rx: None,
223+
started: false,
194224
},
195225
control_interface: Arc::new(LossCellControlInterface { pattern }),
196226
})
@@ -214,6 +244,8 @@ where
214244
prev_loss: usize,
215245
rng: R,
216246
state: AtomicI32,
247+
notify_rx: Option<tokio::sync::broadcast::Receiver<crate::control::RattanNotify>>,
248+
started: bool,
217249
}
218250

219251
impl<P, R> LossReplayCellEgress<P, R>
@@ -267,6 +299,26 @@ where
267299
R: Rng + Send + Sync,
268300
{
269301
async fn dequeue(&mut self) -> Option<P> {
302+
// Wait for Start notify if not started yet
303+
if !self.started {
304+
if let Some(notify_rx) = &mut self.notify_rx {
305+
match notify_rx.recv().await {
306+
Ok(crate::control::RattanNotify::Start) => {
307+
self.reset();
308+
self.change_state(2);
309+
self.started = true;
310+
}
311+
Ok(crate::control::RattanNotify::FirstPacket) => {
312+
// Continue waiting for Start notify
313+
}
314+
Err(_) => {
315+
// Notify channel closed, exit
316+
return None;
317+
}
318+
}
319+
}
320+
}
321+
270322
let packet = loop {
271323
tokio::select! {
272324
biased;
@@ -315,6 +367,13 @@ where
315367
self.state
316368
.store(state, std::sync::atomic::Ordering::Release);
317369
}
370+
371+
fn set_notify_receiver(
372+
&mut self,
373+
notify_rx: tokio::sync::broadcast::Receiver<crate::control::RattanNotify>,
374+
) {
375+
self.notify_rx = Some(notify_rx);
376+
}
318377
}
319378

320379
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@@ -407,6 +466,8 @@ where
407466
prev_loss: 0,
408467
rng,
409468
state: AtomicI32::new(0),
469+
notify_rx: None,
470+
started: false,
410471
},
411472
control_interface: Arc::new(LossReplayCellControlInterface { config_tx }),
412473
})

rattan-core/src/cells/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,13 @@ where
276276

277277
/// 0 means drop, 1 means pass-through, 2 means normal operation
278278
fn change_state(&self, _state: i32) {}
279+
280+
/// Set the notify receiver for the cell to handle Start signals internally
281+
fn set_notify_receiver(
282+
&mut self,
283+
_notify_rx: tokio::sync::broadcast::Receiver<crate::control::RattanNotify>,
284+
) {
285+
}
279286
}
280287

281288
pub trait ControlInterface: Send + Sync + 'static {

0 commit comments

Comments
 (0)