Skip to content

Commit 14b544b

Browse files
authored
fix: robust against body read failures in list endpoint (#3644)
Addressing errors like: ``` LanceError(IO): Generic GCS error: Error getting list response body: error decoding response body, /src/lance/rust/lance-table/src/io/commit.rs:216:17 ```
1 parent b82ebd2 commit 14b544b

3 files changed

Lines changed: 167 additions & 1 deletion

File tree

rust/lance-io/src/object_store.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use std::collections::HashMap;
77
use std::ops::Range;
88
use std::path::PathBuf;
9+
use std::pin::Pin;
910
use std::str::FromStr;
1011
use std::sync::Arc;
1112
use std::time::{Duration, SystemTime};
@@ -16,9 +17,11 @@ use aws_credential_types::provider::ProvideCredentials;
1617
use bytes::Bytes;
1718
use chrono::{DateTime, Utc};
1819
use deepsize::DeepSizeOf;
20+
use futures::Stream;
1921
use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
2022
use lance_core::utils::parse::str_is_truthy;
2123
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
24+
use list_retry::ListRetryStream;
2225
use object_store::aws::{
2326
AmazonS3ConfigKey, AwsCredential as ObjectStoreAwsCredential, AwsCredentialProvider,
2427
};
@@ -37,6 +40,7 @@ use tokio::sync::RwLock;
3740
use url::Url;
3841

3942
use super::local::LocalObjectReader;
43+
mod list_retry;
4044
mod tracing;
4145
use self::tracing::ObjectStoreTracingExt;
4246
use crate::object_writer::WriteResult;
@@ -620,6 +624,13 @@ impl ObjectStore {
620624
.collect())
621625
}
622626

627+
pub fn list(
628+
&self,
629+
path: Option<Path>,
630+
) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
631+
Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
632+
}
633+
623634
/// Read all files (start from base directory) recursively
624635
///
625636
/// unmodified_since can be specified to only return files that have not been modified since the given time.
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
use std::{pin::Pin, sync::Arc, task::Poll};
5+
6+
use futures::{Stream, StreamExt};
7+
use object_store::{path::Path, ObjectMeta, ObjectStore};
8+
use tokio::task::JoinHandle;
9+
10+
/// ObjectStore::list() and ObjectStore::list_with_offset() return a stream
11+
/// where the lifetime is tied to the object store. This makes it hard to wrap.
12+
/// So here we put it inside a tokio task and return a channel receiver.
13+
struct StaticListStream {
14+
rx: tokio::sync::mpsc::Receiver<Result<ObjectMeta, object_store::Error>>,
15+
handle: JoinHandle<()>,
16+
}
17+
18+
impl StaticListStream {
19+
fn new(object_store: Arc<dyn ObjectStore>, prefix: Option<Path>, offset: Option<Path>) -> Self {
20+
let (tx, rx) = tokio::sync::mpsc::channel(100);
21+
let handle = tokio::spawn(async move {
22+
let mut stream = if let Some(offset) = offset {
23+
object_store.list_with_offset(prefix.as_ref(), &offset)
24+
} else {
25+
object_store.list(prefix.as_ref())
26+
};
27+
while let Some(item) = stream.next().await {
28+
if tx.send(item).await.is_err() {
29+
break;
30+
}
31+
}
32+
});
33+
Self { rx, handle }
34+
}
35+
36+
fn abort(&self) {
37+
self.handle.abort();
38+
}
39+
}
40+
41+
impl Stream for StaticListStream {
42+
type Item = Result<ObjectMeta, object_store::Error>;
43+
44+
fn poll_next(
45+
self: Pin<&mut Self>,
46+
cx: &mut std::task::Context<'_>,
47+
) -> std::task::Poll<Option<Self::Item>> {
48+
let this = self.get_mut();
49+
match this.rx.poll_recv(cx) {
50+
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
51+
Poll::Ready(None) => Poll::Ready(None),
52+
Poll::Pending if this.handle.is_finished() => Poll::Ready(None),
53+
Poll::Pending => Poll::Pending,
54+
}
55+
}
56+
}
57+
58+
/// A stream that does outer retries on list operations.
59+
///
60+
/// This is to handle request responses that ObjectStore doesn't handle, such as
61+
/// the error `error decoding response body` from queries to GCS.
62+
pub struct ListRetryStream {
63+
object_store: Arc<dyn ObjectStore>,
64+
current_stream: StaticListStream,
65+
prefix: Option<Path>,
66+
last_successful_key: Option<Path>,
67+
max_retries: usize,
68+
current_retries: usize,
69+
}
70+
71+
impl ListRetryStream {
72+
pub fn new(
73+
object_store: Arc<dyn ObjectStore>,
74+
prefix: Option<Path>,
75+
max_retries: usize,
76+
) -> Self {
77+
let current_stream = StaticListStream::new(object_store.clone(), prefix.clone(), None);
78+
Self {
79+
object_store,
80+
current_stream,
81+
prefix,
82+
last_successful_key: None,
83+
max_retries,
84+
current_retries: 0,
85+
}
86+
}
87+
88+
fn is_retryable(error: &object_store::Error) -> bool {
89+
!matches!(
90+
error,
91+
object_store::Error::NotFound { .. }
92+
| object_store::Error::InvalidPath { .. }
93+
| object_store::Error::NotSupported { .. }
94+
| object_store::Error::NotImplemented
95+
)
96+
}
97+
}
98+
99+
impl Stream for ListRetryStream {
100+
type Item = Result<ObjectMeta, object_store::Error>;
101+
102+
fn poll_next(
103+
self: std::pin::Pin<&mut Self>,
104+
cx: &mut std::task::Context<'_>,
105+
) -> std::task::Poll<Option<Self::Item>> {
106+
let this = self.get_mut();
107+
loop {
108+
match this.current_stream.poll_next_unpin(cx) {
109+
Poll::Ready(Some(Ok(meta))) => {
110+
this.last_successful_key = Some(meta.location.clone());
111+
return Poll::Ready(Some(Ok(meta)));
112+
}
113+
Poll::Ready(None) => {
114+
// If the stream is done, return None
115+
return Poll::Ready(None);
116+
}
117+
Poll::Ready(Some(Err(error))) if Self::is_retryable(&error) => {
118+
if this.current_retries < this.max_retries {
119+
this.current_retries += 1;
120+
121+
this.current_stream.abort();
122+
this.current_stream = StaticListStream::new(
123+
this.object_store.clone(),
124+
this.prefix.clone(),
125+
this.last_successful_key.clone(),
126+
);
127+
128+
continue;
129+
} else {
130+
return Poll::Ready(Some(Err(error)));
131+
}
132+
}
133+
Poll::Ready(Some(Err(error))) => {
134+
return Poll::Ready(Some(Err(error)));
135+
}
136+
Poll::Pending => {
137+
return Poll::Pending;
138+
}
139+
}
140+
}
141+
}
142+
}
143+
144+
#[cfg(test)]
145+
mod tests {
146+
use super::*;
147+
148+
fn assert_send<T: Send>() {}
149+
150+
#[test]
151+
fn test_list_retry_stream_send() {
152+
// Ensure that ListRetryStream is Send
153+
assert_send::<ListRetryStream>();
154+
}
155+
}

rust/lance-table/src/io/commit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ async fn current_manifest_path(
236236
}
237237
}
238238

239-
let manifest_files = object_store.inner.list(Some(&base.child(VERSIONS_DIR)));
239+
let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
240240

241241
let mut valid_manifests = manifest_files.try_filter_map(|res| {
242242
if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())

0 commit comments

Comments
 (0)