Skip to content

Commit 0a84fba

Browse files
authored
impl(pubsub): update last extension for successful extend (#5530)
Lease state propagate extension results back to LeaseLoop using by the LeaseEvent ExtendCompleted and ExtendCompletedEO. The result are used to update the last extension value in the lease state. Fixes: #5048
1 parent adfeb15 commit 0a84fba

4 files changed

Lines changed: 98 additions & 36 deletions

File tree

src/pubsub/src/subscriber/lease_loop.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ impl LeaseLoop {
8787
match event {
8888
LeaseEvent::Flush => state.flush(),
8989
LeaseEvent::Extend => state.extend(),
90+
LeaseEvent::ExtendCompleted(ack_ids) => {
91+
state.update_last_extension(ack_ids);
92+
}
93+
LeaseEvent::ExtendCompletedEO(ack_ids) => {
94+
state.update_last_extension_eo(ack_ids);
95+
}
9096
}
9197
},
9298
message = message_rx.recv() => {
@@ -378,7 +384,7 @@ mod tests {
378384
}
379385

380386
#[tokio_test_no_panics(start_paused = true)]
381-
async fn deadline_interval() -> anyhow::Result<()> {
387+
async fn extend_interval() -> anyhow::Result<()> {
382388
const EXTEND_PERIOD: Duration = Duration::from_secs(1);
383389
const EXTEND_START: Duration = Duration::from_millis(200);
384390

@@ -390,8 +396,9 @@ mod tests {
390396
flush_start: Duration::from_secs(900),
391397
extend_period: EXTEND_PERIOD,
392398
extend_start: EXTEND_START,
393-
// extend leases for all messages on the timer
394-
max_lease_extension: Duration::ZERO,
399+
// max_lease_extension is set to 7 seconds as the test advances
400+
// extend_period twice and the buffer is 5 seconds.
401+
max_lease_extension: Duration::from_secs(7),
395402
..Default::default()
396403
};
397404
let lease_loop = LeaseLoop::new(mock.clone(), confirmed_rx, options);
@@ -403,6 +410,14 @@ mod tests {
403410
lease_loop.strong_message_tx().send(test_message(i))?;
404411
}
405412

413+
// Seed the lease loop with some exactly-once messages
414+
for i in 30..60 {
415+
lease_loop.strong_message_tx().send(NewMessage {
416+
ack_id: test_id(i),
417+
lease_info: exactly_once_info(),
418+
})?;
419+
}
420+
406421
// Confirm initial state
407422
mock.lock().await.checkpoint();
408423

@@ -414,6 +429,14 @@ mod tests {
414429
.times(1)
415430
.withf(|v| sorted(v) == test_ids(0..30))
416431
.returning(move |ack_ids| ack_ids);
432+
433+
mock.lock()
434+
.await
435+
.expect_extend()
436+
.times(1)
437+
.withf(|v| sorted(v) == test_ids(30..60))
438+
.returning(move |ack_ids| ack_ids);
439+
417440
tokio::time::advance(EXTEND_START).await;
418441

419442
// Yield the current task, so tokio can execute the flush().
@@ -426,14 +449,32 @@ mod tests {
426449
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
427450
}
428451

429-
// Advance to and validate the second extension
452+
// Advance to and validate the second extension period (should be skipped)
453+
{
454+
mock.lock().await.expect_extend().times(0);
455+
tokio::time::advance(EXTEND_PERIOD).await;
456+
457+
// Yield the current task, so tokio can execute the flush().
458+
tokio::task::yield_now().await;
459+
mock.lock().await.checkpoint();
460+
}
461+
462+
// Advance to and validate the third extension (should be extended)
430463
{
431464
mock.lock()
432465
.await
433466
.expect_extend()
434467
.times(1)
435468
.withf(|v| sorted(v) == test_ids(10..30))
436-
.returning(move |ack_ids| ack_ids);
469+
.returning(|ack_ids| ack_ids);
470+
471+
mock.lock()
472+
.await
473+
.expect_extend()
474+
.times(1)
475+
.withf(|v| sorted(v) == test_ids(30..60))
476+
.returning(|ack_ids| ack_ids);
477+
437478
tokio::time::advance(EXTEND_PERIOD).await;
438479

439480
// Yield the current task, so tokio can execute the flush().

src/pubsub/src/subscriber/lease_state.rs

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ pub(super) enum LeaseEvent {
188188
Flush,
189189
/// Extend leases
190190
Extend,
191+
/// Pending extensions completed
192+
ExtendCompleted(Vec<String>),
193+
/// Pending exactly-once extensions completed
194+
ExtendCompletedEO(Vec<String>),
191195
}
192196

193197
impl<L> LeaseState<L>
@@ -225,9 +229,27 @@ where
225229
return LeaseEvent::Flush;
226230
}
227231

228-
tokio::select! {
229-
_ = self.flush_interval.tick() => LeaseEvent::Flush,
230-
_ = self.extend_interval.tick() => LeaseEvent::Extend,
232+
loop {
233+
tokio::select! {
234+
_ = self.flush_interval.tick() => return LeaseEvent::Flush,
235+
_ = self.extend_interval.tick() => return LeaseEvent::Extend,
236+
res = self.pending_extends.join_next(), if !self.pending_extends.is_empty() => {
237+
if let Some(Ok(ack_ids)) = res {
238+
return LeaseEvent::ExtendCompleted(ack_ids);
239+
} else {
240+
// swallow the JoinError.
241+
continue;
242+
}
243+
}
244+
res = self.eo_pending_extends.join_next(), if !self.eo_pending_extends.is_empty() => {
245+
if let Some(Ok(ack_ids)) = res {
246+
return LeaseEvent::ExtendCompletedEO(ack_ids);
247+
} else {
248+
// swallow the JoinError.
249+
continue;
250+
}
251+
}
252+
}
231253
}
232254
}
233255

@@ -262,14 +284,12 @@ where
262284

263285
/// Updates the `last_extension` timestamp for the given at-least-once ack IDs
264286
/// with the completion time of a successful extension RPC.
265-
#[allow(dead_code)]
266287
pub(super) fn update_last_extension(&mut self, ack_ids: Vec<String>) {
267288
self.leases.update_last_extension(&ack_ids);
268289
}
269290

270291
/// Updates the `last_extension` timestamp for the given exactly-once ack IDs
271292
/// with the completion time of a successful extension RPC.
272-
#[allow(dead_code)]
273293
pub(super) fn update_last_extension_eo(&mut self, ack_ids: Vec<String>) {
274294
self.eo_leases.update_last_extension(&ack_ids);
275295
}
@@ -320,9 +340,6 @@ where
320340
self.eo_pending_extends
321341
.spawn(async move { leaser.extend(ack_ids).await });
322342
}
323-
324-
// TODO(#5048) - we could process the results as a lease event.
325-
while self.pending_extends.try_join_next().is_some() {}
326343
}
327344

328345
/// Shutdown the leaser
@@ -955,9 +972,13 @@ pub(super) mod tests {
955972
async fn pending_extends_size_management() {
956973
let mut mock = MockLeaser::new();
957974
mock.expect_extend()
958-
.times(2)
975+
.times(1)
959976
.withf(|v| *v == vec![test_id(1)])
960-
.returning(move |ack_ids| ack_ids);
977+
.returning(|ack_ids| ack_ids);
978+
mock.expect_extend()
979+
.times(1)
980+
.withf(|v| *v == vec![test_id(2)])
981+
.returning(|ack_ids| ack_ids);
961982

962983
let options = LeaseOptions {
963984
max_lease_extension: Duration::ZERO,
@@ -966,21 +987,26 @@ pub(super) mod tests {
966987
let mut state = LeaseState::new(Arc::new(mock), options);
967988

968989
state.add(test_id(1), at_least_once_info());
990+
state.add(test_id(2), exactly_once_info());
969991
state.extend();
970-
// Yield execution so the extend attempt can execute.
971-
tokio::task::yield_now().await;
972992

973-
// TODO(#5048) - We currently clean up the completed pending extends in
974-
// `LeaseState::extend()`. If we decide to clean up the pending extends
975-
// elsewhere, this test will need an update.
976-
state.extend();
977-
let pending_extends = state.pending_extends.len();
978-
assert!(
979-
pending_extends < 2,
980-
"The first lease extension attempt should have completed. We should not hold onto it."
981-
);
993+
let mut events = Vec::new();
994+
events.push(state.next_event().await);
995+
events.push(state.next_event().await);
996+
997+
assert!(events.contains(&LeaseEvent::ExtendCompleted(test_ids(1..2))));
998+
assert!(events.contains(&LeaseEvent::ExtendCompletedEO(test_ids(2..3))));
982999

983-
let _ = state.pending_extends.join_all().await;
1000+
assert_eq!(
1001+
state.pending_extends.len(),
1002+
0,
1003+
"Completed at-least-once extensions should be cleaned up"
1004+
);
1005+
assert_eq!(
1006+
state.eo_pending_extends.len(),
1007+
0,
1008+
"Completed exactly-once extensions should be cleaned up"
1009+
);
9841010
}
9851011

9861012
#[tokio::test]

src/pubsub/src/subscriber/lease_state/at_least_once.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,6 @@ impl Leases {
111111
// Flush the batch when it is full.
112112
batches.push(std::mem::take(&mut batch));
113113
}
114-
// TODO(#5048): Do not update last_extension here after update_last_extension fn
115-
// is used to report successful extends.
116-
info.last_extension = Some(now);
117114
true
118115
}
119116
});
@@ -589,6 +586,7 @@ mod tests {
589586
// We should always send a receipt lease extension upon receiving a
590587
// message.
591588
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
589+
leases.update_last_extension(&test_ids(0..1));
592590
assert_eq!(batches, vec![vec![test_id(0)]]);
593591
assert_eq!(
594592
TestLeases {
@@ -617,6 +615,7 @@ mod tests {
617615

618616
// We need to extend the lease again.
619617
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
618+
leases.update_last_extension(&test_ids(0..1));
620619
assert_eq!(batches, vec![vec![test_id(0)]]);
621620
assert_eq!(
622621
TestLeases {

src/pubsub/src/subscriber/lease_state/exactly_once.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,6 @@ impl Leases {
130130
None
131131
} else {
132132
// Continue to extend messages being acked.
133-
// TODO(#5048): Do not update last_extension here after update_last_extension fn
134-
// is used to report successful extends.
135-
info.last_extension = Some(now);
136133
Some(id.clone())
137134
}
138135
}
@@ -148,9 +145,6 @@ impl Leases {
148145
None
149146
} else {
150147
// Extend leases for all other messages
151-
// TODO(#5048): Do not update last_extension here after update_last_extension fn
152-
// is used to report successful extends.
153-
info.last_extension = Some(now);
154148
Some(id.clone())
155149
}
156150
}
@@ -874,6 +868,7 @@ mod tests {
874868
// We should always send a receipt lease extension upon receiving a
875869
// message.
876870
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
871+
leases.update_last_extension(&test_ids(0..2));
877872
let flattened = Batches::flatten(batches);
878873
assert_eq!(sorted(&flattened.ack_ids), test_ids(0..2));
879874
assert_eq!(
@@ -903,6 +898,7 @@ mod tests {
903898

904899
// We need to extend the lease again.
905900
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
901+
leases.update_last_extension(&test_ids(0..2));
906902
let flattened = Batches::flatten(batches);
907903
assert_eq!(sorted(&flattened.ack_ids), test_ids(0..2));
908904
assert_eq!(

0 commit comments

Comments
 (0)