Skip to content

Commit 0f851ea

Browse files
authored
feat(pubsub): expose RetryableErrors for customized retry configuration (#5411)
This change introduces a new public `retry_policy` module in the `google-cloud-pubsub` crate, exporting the `RetryableErrors` struct. Users can now explicitly reference and decorate this policy to customize retry limits and durations on Publisher instances. Fixes #5366
1 parent edd6308 commit 0f851ea

6 files changed

Lines changed: 220 additions & 198 deletions

File tree

src/pubsub/examples/src/publisher/publish_with_retry_settings.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,18 @@
1414

1515
// [START pubsub_publisher_retry_settings]
1616
use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
17-
use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
17+
use google_cloud_gax::retry_policy::RetryPolicyExt;
1818
use google_cloud_pubsub::client::Publisher;
1919
use google_cloud_pubsub::model::Message;
20+
use google_cloud_pubsub::retry_policy::RetryableErrors;
2021
use std::time::Duration;
2122

2223
pub async fn sample(project_id: &str, topic_id: &str) -> anyhow::Result<()> {
2324
let topic_name = format!("projects/{project_id}/topics/{topic_id}");
2425

2526
// Configure custom retry settings.
26-
// In this example, we retry with a time limit of 10 minutes.
27-
let retry_policy = AlwaysRetry.with_time_limit(Duration::from_secs(600));
27+
// In this example, we retry with a time limit of 5 minutes.
28+
let retry_policy = RetryableErrors.with_time_limit(Duration::from_secs(300));
2829

2930
// Configure custom backoff settings.
3031
let backoff_policy = ExponentialBackoffBuilder::new()

src/pubsub/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ pub mod client {
166166
}
167167

168168
pub mod error;
169+
pub mod retry_policy;
169170

170171
/// Traits to mock the clients in this library.
171172
pub mod stub {

src/pubsub/src/publisher/builder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,10 @@ impl PublisherBuilder {
188188
/// ```
189189
/// # use google_cloud_pubsub::client::Publisher;
190190
/// # async fn sample() -> anyhow::Result<()> {
191-
/// use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
191+
/// use google_cloud_gax::retry_policy::RetryPolicyExt;
192+
/// use google_cloud_pubsub::retry_policy::RetryableErrors;
192193
/// let client = Publisher::builder("projects/my-project/topics/my-topic")
193-
/// .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
194+
/// .with_retry_policy(RetryableErrors.with_attempt_limit(3))
194195
/// .build().await?;
195196
/// # Ok(()) };
196197
/// ```

src/pubsub/src/publisher/client_builder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,10 @@ impl BasePublisherBuilder {
139139
/// ```
140140
/// # use google_cloud_pubsub::client::BasePublisher;
141141
/// # async fn sample() -> anyhow::Result<()> {
142-
/// use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
142+
/// use google_cloud_gax::retry_policy::RetryPolicyExt;
143+
/// use google_cloud_pubsub::retry_policy::RetryableErrors;
143144
/// let client = BasePublisher::builder()
144-
/// .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
145+
/// .with_retry_policy(RetryableErrors.with_attempt_limit(3))
145146
/// .build()
146147
/// .await?;
147148
/// # Ok(()) };

src/pubsub/src/publisher/retry_policy.rs

Lines changed: 2 additions & 191 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
//! Defines the retry policies for the Cloud Pub/Sub Publisher.
16-
//!
17-
//! The Pub/Sub service [recommends] retrying several transient error codes.
18-
//!
19-
//! - [Unavailable][Code::Unavailable]
20-
//! - [Internal][Code::Internal]
21-
//! - [Resource Exhausted][Code::ResourceExhausted]
22-
//! - [Aborted][Code::Aborted]
23-
//! - [Deadline Exceeded][Code::DeadlineExceeded]
24-
//! - [Cancelled][Code::Cancelled]
25-
//! - [Unknown][Code::Unknown]
26-
//!
27-
//! [recommends]: https://docs.cloud.google.com/pubsub/docs/reference/error-codes
28-
//! [Code::Unavailable]: google_cloud_gax::error::rpc::Code::Unavailable
29-
//! [Code::Internal]: google_cloud_gax::error::rpc::Code::Internal
30-
//! [Code::ResourceExhausted]: google_cloud_gax::error::rpc::Code::ResourceExhausted
31-
//! [Code::Aborted]: google_cloud_gax::error::rpc::Code::Aborted
32-
//! [Code::DeadlineExceeded]: google_cloud_gax::error::rpc::Code::DeadlineExceeded
33-
//! [Code::Cancelled]: google_cloud_gax::error::rpc::Code::Cancelled
34-
//! [Code::Unknown]: google_cloud_gax::error::rpc::Code::Unknown
15+
//! Defines the internal retry policies for the Cloud Pub/Sub Publisher.
3516
36-
use crate::Error;
17+
use crate::retry_policy::RetryableErrors;
3718
use google_cloud_gax::retry_policy::{RetryPolicy, RetryPolicyExt};
38-
use google_cloud_gax::retry_result::RetryResult;
39-
use google_cloud_gax::retry_state::RetryState;
4019
use std::time::Duration;
4120

4221
/// The default retry policy for the Pub/Sub publisher.
@@ -46,171 +25,3 @@ use std::time::Duration;
4625
pub(crate) fn default_retry_policy() -> impl RetryPolicy {
4726
RetryableErrors.with_time_limit(Duration::from_secs(600))
4827
}
49-
50-
/// Follows the retry strategy recommended by the Cloud Pub/Sub guides on
51-
/// [error codes].
52-
///
53-
/// This policy must be decorated to limit the duration of the retry loop.
54-
///
55-
/// [error codes]: https://docs.cloud.google.com/pubsub/docs/reference/error-codes
56-
#[derive(Clone, Debug)]
57-
pub struct RetryableErrors;
58-
59-
impl RetryPolicy for RetryableErrors {
60-
fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
61-
if error.is_transient_and_before_rpc() {
62-
return RetryResult::Continue(error);
63-
}
64-
65-
if error.is_io() || error.is_timeout() {
66-
return RetryResult::Continue(error);
67-
}
68-
69-
if error.is_transport() && error.http_status_code().is_none() {
70-
// Sometimes gRPC returns a transport error without an HTTP status
71-
// code. We treat all of these as I/O errors and therefore
72-
// retryable.
73-
return RetryResult::Continue(error);
74-
}
75-
76-
// Catch raw HTTP errors that may not have been mapped to a gRPC status.
77-
// - 408: Request Timeout
78-
// - 429: Resource Exhausted
79-
// - 499: Cancelled Request
80-
// - 5xx: Internal Server Error, Bad Gateway, etc.
81-
if let Some(408 | 429 | 499 | 500..600) = error.http_status_code() {
82-
return RetryResult::Continue(error);
83-
}
84-
85-
if let Some(status) = error.status() {
86-
use google_cloud_gax::error::rpc::Code;
87-
return match status.code {
88-
Code::Aborted
89-
| Code::Cancelled
90-
| Code::DeadlineExceeded
91-
| Code::Internal
92-
| Code::ResourceExhausted
93-
| Code::Unavailable
94-
| Code::Unknown => RetryResult::Continue(error),
95-
_ => RetryResult::Permanent(error),
96-
};
97-
}
98-
99-
RetryResult::Permanent(error)
100-
}
101-
}
102-
103-
#[cfg(test)]
104-
mod tests {
105-
use super::*;
106-
use google_cloud_gax::error::rpc::{Code, Status};
107-
use google_cloud_gax::retry_state::RetryState;
108-
use http::HeaderMap;
109-
use test_case::test_case;
110-
111-
#[test]
112-
fn transport_reset() {
113-
let p = RetryableErrors;
114-
assert!(
115-
p.on_error(&RetryState::default(), transport_err())
116-
.is_continue()
117-
);
118-
}
119-
120-
#[test_case(408)]
121-
#[test_case(429)]
122-
#[test_case(499)]
123-
#[test_case(500)]
124-
#[test_case(502)]
125-
#[test_case(503)]
126-
#[test_case(504)]
127-
fn retryable_http(code: u16) {
128-
let p = RetryableErrors;
129-
assert!(
130-
p.on_error(&RetryState::default(), http_error(code))
131-
.is_continue()
132-
);
133-
}
134-
135-
#[test_case(409)]
136-
#[test_case(400)]
137-
#[test_case(404)]
138-
fn permanent_http(code: u16) {
139-
let p = RetryableErrors;
140-
assert!(
141-
p.on_error(&RetryState::default(), http_error(code))
142-
.is_permanent()
143-
);
144-
}
145-
146-
#[test_case(Code::Unavailable)]
147-
#[test_case(Code::Internal)]
148-
#[test_case(Code::Aborted)]
149-
#[test_case(Code::ResourceExhausted)]
150-
#[test_case(Code::DeadlineExceeded)]
151-
#[test_case(Code::Cancelled)]
152-
#[test_case(Code::Unknown)]
153-
fn retryable_grpc(code: Code) {
154-
let p = RetryableErrors;
155-
assert!(
156-
p.on_error(&RetryState::default(), grpc_error(code))
157-
.is_continue()
158-
);
159-
}
160-
161-
#[test_case(Code::NotFound)]
162-
#[test_case(Code::PermissionDenied)]
163-
#[test_case(Code::InvalidArgument)]
164-
fn permanent_grpc(code: Code) {
165-
let p = RetryableErrors;
166-
assert!(
167-
p.on_error(&RetryState::default(), grpc_error(code))
168-
.is_permanent()
169-
);
170-
}
171-
172-
#[test]
173-
fn io() {
174-
let p = RetryableErrors;
175-
assert!(p.on_error(&RetryState::default(), io_error()).is_continue());
176-
}
177-
178-
#[test]
179-
fn permanent_auth() {
180-
let p = RetryableErrors;
181-
let auth_error =
182-
google_cloud_gax::error::CredentialsError::from_msg(false, "permanent auth error");
183-
assert!(
184-
p.on_error(&RetryState::default(), Error::authentication(auth_error))
185-
.is_permanent()
186-
);
187-
}
188-
189-
#[test]
190-
fn transient_auth() {
191-
let p = RetryableErrors;
192-
let auth_error =
193-
google_cloud_gax::error::CredentialsError::from_msg(true, "transient auth error");
194-
assert!(
195-
p.on_error(&RetryState::default(), Error::authentication(auth_error))
196-
.is_continue()
197-
);
198-
}
199-
200-
fn transport_err() -> Error {
201-
Error::transport(HeaderMap::new(), "connection closed")
202-
}
203-
204-
fn http_error(code: u16) -> Error {
205-
Error::http(code, HeaderMap::new(), bytes::Bytes::new())
206-
}
207-
208-
fn grpc_error(code: Code) -> Error {
209-
let status = Status::default().set_code(code).set_message("try again");
210-
Error::service(status)
211-
}
212-
213-
fn io_error() -> Error {
214-
Error::io(gaxi::grpc::tonic::Status::unavailable("try again"))
215-
}
216-
}

0 commit comments

Comments
 (0)