Skip to content

Commit 8cc7ac3

Browse files
authored
feat(pubsub): add APIs to shutdown subscriber (#5257)
Most of the work for #5024 (the rest are cleanups) Provide a way for applications to both signal and await a shutdown of a subscriber message stream. --- Docs look like: <img width="1277" height="557" alt="Screenshot From 2026-04-03 12-52-40" src="https://github.com/user-attachments/assets/e018716d-270e-4864-a02c-36f07bef2151" /> <img width="1287" height="965" alt="Screenshot From 2026-04-03 12-54-36" src="https://github.com/user-attachments/assets/4711d483-b2e1-46d6-83e6-8be4ce34a61c" />
1 parent c295557 commit 8cc7ac3

3 files changed

Lines changed: 24 additions & 5 deletions

File tree

src/pubsub/src/subscriber.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod handler;
1616

1717
pub use message_stream::MessageStream;
1818
pub use shutdown_behavior::ShutdownBehavior;
19+
pub use shutdown_token::ShutdownToken;
1920

2021
pub(super) mod builder;
2122
pub(super) mod client;
@@ -27,7 +28,6 @@ mod leaser;
2728
mod message_stream;
2829
mod retry_policy;
2930
mod shutdown_behavior;
30-
#[allow(dead_code)] // TODO(#5024) - implementation in progress...
3131
mod shutdown_token;
3232
mod stream;
3333
mod stub;

src/pubsub/src/subscriber/message_stream.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use super::lease_loop::LeaseLoop;
1818
use super::lease_state::{AtLeastOnceInfo, ExactlyOnceInfo, LeaseInfo, LeaseOptions, NewMessage};
1919
use super::leaser::DefaultLeaser;
2020
use super::retry_policy::StreamRetryPolicy;
21-
#[cfg(test)]
2221
use super::shutdown_token::ShutdownToken;
2322
use super::stream::Stream;
2423
use super::stub::TonicStreaming as _;
@@ -65,7 +64,6 @@ pub struct MessageStream {
6564
/// the same time.
6665
inner: MessageStreamImpl,
6766

68-
#[allow(dead_code)] // TODO(#5024) - implementation in progress...
6967
/// This future is ready when the lease loop shutdown completes.
7068
lease_loop: Shared<BoxFuture<'static, ()>>,
7169

@@ -254,7 +252,28 @@ impl MessageStream {
254252
}))
255253
}
256254

257-
#[cfg(test)] // TODO(#5024) - document and make public
255+
/// Returns a shutdown token for the stream.
256+
///
257+
/// # Example
258+
/// ```
259+
/// # use google_cloud_pubsub::subscriber::MessageStream;
260+
/// # async fn sample(mut stream: MessageStream) {
261+
/// // Get a shutdown token for the stream.
262+
/// let shutdown_token = stream.shutdown_token();
263+
///
264+
/// // Signal and await a shutdown of the stream.
265+
/// shutdown_token.shutdown().await;
266+
///
267+
/// // The stream stops yielding messages after a cancel.
268+
/// assert!(stream.next().await.is_none());
269+
/// # }
270+
/// ```
271+
///
272+
/// Use this token to signal and/or await shutdown of the stream.
273+
///
274+
/// Awaiting a stream shutdown gives the subscriber time to flush its
275+
/// pending acknowledgements, and schedule other messages for redelivery to
276+
/// another client as soon as possible.
258277
pub fn shutdown_token(&self) -> ShutdownToken {
259278
ShutdownToken {
260279
inner: self.shutdown.clone(),

src/pubsub/src/subscriber/shutdown_token.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use tokio_util::sync::CancellationToken;
1818
/// A token to signal and await shutdown of a stream.
1919
///
2020
/// # Example
21-
/// ```no_rust
21+
/// ```
2222
/// use google_cloud_pubsub::subscriber::MessageStream;
2323
/// async fn sample(stream: MessageStream) {
2424
/// // Get a shutdown token for the stream.

0 commit comments

Comments
 (0)