Skip to content

Commit be414ae

Browse files
committed
fix(storage): restore TLS for GCS OpenDAL backend
OpenDAL 0.56 uses reqwest 0.13 internally, so the workspace reqwest 0.12 TLS feature no longer enables TLS for OpenDAL after default-features = false strips OpenDAL's defaults. Enable OpenDAL's reqwest-rustls-tls feature from quickwit-storage's gcs feature so the GCS backend can use HTTPS again. Add a self-contained regression test that starts a local HTTPS server, points the GCS service at it, and performs a real range read through OpenDAL. The test uses a reqwest 0.13 dev alias with no TLS features of its own, so reverting the OpenDAL TLS feature makes the test fail instead of masking the regression. Closes #6477
1 parent f3880d5 commit be414ae

3 files changed

Lines changed: 260 additions & 1 deletion

File tree

quickwit/Cargo.lock

Lines changed: 115 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-storage/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ reqwest = { workspace = true, optional = true }
6262
http = { workspace = true }
6363
mockall = { workspace = true }
6464
proptest = { workspace = true }
65+
reqwest-013 = { package = "reqwest", version = "0.13", default-features = false }
66+
rustls = { workspace = true }
6567
tokio = { workspace = true }
68+
tokio-rustls = { workspace = true }
6669
tracing-subscriber = { workspace = true }
6770

6871
aws-sdk-s3 = { workspace = true }
@@ -81,7 +84,11 @@ azure = [
8184
"azure_storage/enable_reqwest_rustls",
8285
"azure_storage_blobs/enable_reqwest_rustls",
8386
]
84-
gcs = ["dep:opendal", "opendal/services-gcs"]
87+
gcs = [
88+
"dep:opendal",
89+
"opendal/reqwest-rustls-tls",
90+
"opendal/services-gcs",
91+
]
8592
ci-test = []
8693
integration-testsuite = [
8794
"azure",

quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,55 @@ fn parse_google_uri(uri: &Uri) -> Option<(String, PathBuf)> {
113113

114114
#[cfg(test)]
115115
mod tests {
116+
use std::sync::Arc;
117+
118+
use base64::Engine;
119+
use opendal::Operator;
120+
use opendal::layers::HttpClientLayer;
121+
use opendal::raw::HttpClient;
116122
use quickwit_common::uri::Uri;
123+
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivateSec1KeyDer};
124+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
125+
use tokio::net::TcpListener;
126+
use tokio::task::JoinHandle;
117127

118128
use super::parse_google_uri;
119129

130+
// Test-only localhost certificate chain. The client below trusts only this
131+
// embedded CA, so the test never depends on the host root store.
132+
const TEST_CA_CERT_DER_BASE64: &str = concat!(
133+
"MIIBizCCATGgAwIBAgIURn3X2TGz13Wv1/p3x8S3y1ALQwkwCgYIKoZIzj0EAwIw",
134+
"GzEZMBcGA1UEAwwQUXVpY2t3aXQgVGVzdCBDQTAeFw0yNjA1MzAxMTE4NDJaFw0z",
135+
"NjA1MjcxMTE4NDJaMBsxGTAXBgNVBAMMEFF1aWNrd2l0IFRlc3QgQ0EwWTATBgcq",
136+
"hkjOPQIBBggqhkjOPQMBBwNCAATyMe2ARkjYkk+QnWZA8T+A99xLcHhMRdcMBElb",
137+
"NFYf2nccWkaaqEnUGOIe6Y6ODehYX5dpEWpwkXQ20p52mtDLo1MwUTAdBgNVHQ4E",
138+
"FgQUgwSEbLYdLqTp4uEWN3CqY4LkHgUwHwYDVR0jBBgwFoAUgwSEbLYdLqTp4uEW",
139+
"N3CqY4LkHgUwDwYDVR0TAQH/BAUwAwEB/zAKBggqhkjOPQQDAgNIADBFAiAvxQlF",
140+
"s1tG0Qs/rY9WMcKdvCiOyW044KePy4t9jzkhsgIhALBerDKsWc80bgBYWetpIZVS",
141+
"FdrixGuySBCdAG9w0rWn",
142+
);
143+
144+
const TEST_SERVER_CERT_DER_BASE64: &str = concat!(
145+
"MIIBuDCCAV6gAwIBAgIUIwRonRLHFnBmsr8U5Ytq9LFW6ZMwCgYIKoZIzj0EAwIw",
146+
"GzEZMBcGA1UEAwwQUXVpY2t3aXQgVGVzdCBDQTAeFw0yNjA1MzAxMTE4NDJaFw0z",
147+
"NjA1MjcxMTE4NDJaMBQxEjAQBgNVBAMMCWxvY2FsaG9zdDBZMBMGByqGSM49AgEG",
148+
"CCqGSM49AwEHA0IABFqZwG11V1JJijKkWXhVkFOoq5/VAjkjejQb0BcQRadBQTN2",
149+
"yziIrxFWSgfnekyIa3j7Zfa4WIPL3LLfrjBg6xOjgYYwgYMwFAYDVR0RBA0wC4IJ",
150+
"bG9jYWxob3N0MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgWgMBMGA1UdJQQMMAoGCCsG",
151+
"AQUFBwMBMB0GA1UdDgQWBBQecwDdu1wYyfkDKm8RsEBUzl3eijAfBgNVHSMEGDAW",
152+
"gBSDBIRsth0upOni4RY3cKpjguQeBTAKBggqhkjOPQQDAgNIADBFAiBhbBIAKEID",
153+
"6F0vribdLyPkJbMoNNOSK79hdrpdnYyTZwIhALwHMlmusqbK2lzH1l6bwbpldrfb",
154+
"xiQgxb9SrOgsPmwz",
155+
);
156+
157+
const TEST_SERVER_KEY_DER_BASE64: &str = concat!(
158+
"MHcCAQEEIB39TDCC+qOWh5pRcqaGvaAY2Zx9hYWgREB/QoFehxH2oAoGCCqGSM49",
159+
"AwEHoUQDQgAEWpnAbXVXUkmKMqRZeFWQU6irn9UCOSN6NBvQFxBFp0FBM3bLOIiv",
160+
"EVZKB+d6TIhrePtl9rhYg8vcst+uMGDrEw==",
161+
);
162+
163+
type LocalHttpsGcsServer = (String, JoinHandle<anyhow::Result<()>>);
164+
120165
#[test]
121166
fn test_parse_google_uri() {
122167
assert!(parse_google_uri(&Uri::for_test("gs://")).is_none());
@@ -134,4 +179,96 @@ mod tests {
134179
assert_eq!(bucket, "test-bucket");
135180
assert_eq!(prefix.to_str().unwrap(), "indexes");
136181
}
182+
183+
#[tokio::test]
184+
#[cfg_attr(not(feature = "ci-test"), ignore)]
185+
async fn test_gcs_https_endpoint_can_read_from_local_server() -> anyhow::Result<()> {
186+
let (endpoint, server_task) = start_local_https_gcs_server().await?;
187+
let ca_cert_der = decode_test_der(TEST_CA_CERT_DER_BASE64)?;
188+
let ca_cert = reqwest_013::Certificate::from_der(&ca_cert_der)?;
189+
let reqwest_client = reqwest_013::Client::builder()
190+
.no_proxy()
191+
.tls_certs_only([ca_cert])
192+
.build()?;
193+
194+
let cfg = opendal::services::Gcs::default()
195+
.bucket("quickwit-test-bucket")
196+
.endpoint(&endpoint)
197+
.allow_anonymous()
198+
.disable_config_load()
199+
.disable_vm_metadata();
200+
let op = Operator::new(cfg)?
201+
.layer(HttpClientLayer::new(HttpClient::with(reqwest_client)))
202+
.finish();
203+
204+
let bytes = op.read_with("hello.txt").range(0..2).await?.to_bytes();
205+
assert_eq!(bytes.as_ref(), b"ok");
206+
server_task.await??;
207+
Ok(())
208+
}
209+
210+
async fn start_local_https_gcs_server() -> anyhow::Result<LocalHttpsGcsServer> {
211+
let cert_chain = vec![CertificateDer::from(decode_test_der(
212+
TEST_SERVER_CERT_DER_BASE64,
213+
)?)];
214+
let private_key = PrivateKeyDer::Sec1(PrivateSec1KeyDer::from(decode_test_der(
215+
TEST_SERVER_KEY_DER_BASE64,
216+
)?));
217+
let tls_config = rustls::ServerConfig::builder()
218+
.with_no_client_auth()
219+
.with_single_cert(cert_chain, private_key)?;
220+
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
221+
let listener = TcpListener::bind(("127.0.0.1", 0)).await?;
222+
let endpoint = format!("https://localhost:{}", listener.local_addr()?.port());
223+
224+
let server_task = tokio::spawn(async move {
225+
let (stream, _) = listener.accept().await?;
226+
let mut stream = tls_acceptor.accept(stream).await?;
227+
let mut request = Vec::new();
228+
let mut buffer = [0u8; 1024];
229+
loop {
230+
let bytes_read = stream.read(&mut buffer).await?;
231+
if bytes_read == 0 {
232+
break;
233+
}
234+
request.extend_from_slice(&buffer[..bytes_read]);
235+
if request.windows(4).any(|window| window == b"\r\n\r\n") {
236+
break;
237+
}
238+
}
239+
let request = String::from_utf8_lossy(&request);
240+
assert!(
241+
request.starts_with(
242+
"GET /storage/v1/b/quickwit-test-bucket/o/hello.txt?alt=media HTTP/"
243+
),
244+
"unexpected GCS request line: {request}"
245+
);
246+
assert!(
247+
request
248+
.to_ascii_lowercase()
249+
.contains("\r\nrange: bytes=0-1\r\n"),
250+
"expected range read request header: {request}"
251+
);
252+
253+
stream
254+
.write_all(
255+
b"HTTP/1.1 206 Partial Content\r\n\
256+
Content-Length: 2\r\n\
257+
Content-Range: bytes 0-1/2\r\n\
258+
Accept-Ranges: bytes\r\n\
259+
Connection: close\r\n\
260+
\r\n\
261+
ok",
262+
)
263+
.await?;
264+
stream.shutdown().await?;
265+
Ok(())
266+
});
267+
268+
Ok((endpoint, server_task))
269+
}
270+
271+
fn decode_test_der(base64_der: &str) -> anyhow::Result<Vec<u8>> {
272+
Ok(base64::engine::general_purpose::STANDARD.decode(base64_der)?)
273+
}
137274
}

0 commit comments

Comments
 (0)