Related to #4869, #4868
Those are more concerned with the shutdown behavior. This issue is about how the application can signal and await a shutdown.
Background
Messages can get "frozen" for 10 minutes if a program exits before the subscriber can flush pending nacks. And there may be unnecessary redeliveries if the subscriber does not flush pending acks. Letting an application await shutdown can avoid/mitigate these problems.
It is also somewhat difficult for an application to signal a shutdown. They have to drop(stream), which means they need to own the stream. It is more convenient if applications can spawn a background task, and signal shutdown from it.
Design
We should add a ShutdownToken, with an accessor for it on the MessageStream:
#[derive(Clone)]
pub struct ShutdownToken {
inner: CancellationToken,
fut: Shared<BoxFuture<'static, ()>>,
}
impl ShutdownToken {
// I think it is convenient to shutdown and await.
pub async fn shutdown(&self) {
self.inner.cancel();
self.fut.await
}
// We could also add this to just signal. But initially, I would not add this.
pub fn signal_shutdown(&self) {
self.inner.cancel();
}
}
impl MessageStream {
pub fn shutdown_token(&self) -> ShutdownToken {
self.shutdown.clone()
}
}
Work
Related to #4869, #4868
Those are more concerned with the shutdown behavior. This issue is about how the application can signal and await a shutdown.
Background
Messages can get "frozen" for 10 minutes if a program exits before the subscriber can flush pending nacks. And there may be unnecessary redeliveries if the subscriber does not flush pending acks. Letting an application await shutdown can avoid/mitigate these problems.
It is also somewhat difficult for an application to signal a shutdown. They have to
drop(stream), which means they need to own the stream. It is more convenient if applications can spawn a background task, and signal shutdown from it.Design
We should add a
ShutdownToken, with an accessor for it on theMessageStream:Work
MessageStreamCancellationTokentoMessageStreamShutdownTokenshutdown_token()accessordrop(stream)MessageStream::close()