Skip to content

Commit 2b4e8a8

Browse files
committed
test(storage): cover GCS HTTPS OpenDAL TLS
1 parent 0b26a07 commit 2b4e8a8

4 files changed

Lines changed: 200 additions & 3 deletions

File tree

quickwit/Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-storage/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,13 @@ reqwest = { workspace = true, optional = true }
6262
http = { workspace = true }
6363
mockall = { workspace = true }
6464
proptest = { workspace = true }
65+
# Match OpenDAL's internal reqwest major. `default-features = false` is
66+
# intentional: the HTTPS regression test should get TLS only from OpenDAL's
67+
# `reqwest-rustls-tls` feature, not from this dev-dependency.
68+
reqwest-013 = { package = "reqwest", version = "0.13", default-features = false }
69+
rustls = { workspace = true }
6570
tokio = { workspace = true }
71+
tokio-rustls = { workspace = true }
6672
tracing-subscriber = { workspace = true }
6773

6874
aws-sdk-s3 = { workspace = true }

quickwit/quickwit-storage/src/opendal_storage/base.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,30 @@ impl OpendalStorage {
6060
cfg: opendal::services::Gcs,
6161
) -> Result<Self, StorageResolverError> {
6262
let op = Operator::new(cfg)?.finish();
63-
Ok(Self {
63+
Ok(Self::from_operator(uri, op))
64+
}
65+
66+
fn from_operator(uri: Uri, op: Operator) -> Self {
67+
Self {
6468
uri,
6569
op,
6670
// limits are the same as on S3
6771
multipart_policy: MultiPartPolicy::default(),
68-
})
72+
}
73+
}
74+
75+
#[cfg(test)]
76+
// Lets local HTTPS tests trust a private CA without changing global trust,
77+
// while still using Quickwit's GCS storage construction and read path.
78+
pub(super) fn new_google_cloud_storage_with_http_client_for_test(
79+
uri: Uri,
80+
cfg: opendal::services::Gcs,
81+
http_client: opendal::raw::HttpClient,
82+
) -> Result<Self, StorageResolverError> {
83+
let op = Operator::new(cfg)?
84+
.layer(opendal::layers::HttpClientLayer::new(http_client))
85+
.finish();
86+
Ok(Self::from_operator(uri, op))
6987
}
7088

7189
#[cfg(feature = "integration-testsuite")]

quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,60 @@ fn parse_google_uri(uri: &Uri) -> Option<(String, PathBuf)> {
113113

114114
#[cfg(test)]
115115
mod tests {
116+
use std::collections::BTreeMap;
117+
use std::path::Path;
118+
use std::sync::Arc;
119+
120+
use base64::Engine;
121+
use opendal::raw::HttpClient;
116122
use quickwit_common::uri::Uri;
123+
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivateSec1KeyDer};
124+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
125+
use tokio::net::TcpListener;
126+
use tokio::task::JoinHandle;
127+
128+
use super::{OpendalStorage, parse_google_uri};
129+
use crate::Storage;
130+
131+
// Test-only CA and server certificate for 127.0.0.1/localhost, valid from
132+
// 2020 to 3020. The client trusts only this CA, so the test never depends
133+
// on the host root store.
134+
// To regenerate, use `openssl ecparam -name prime256v1` keys, sign a CA
135+
// and server cert with `-not_before 20200101000000Z`,
136+
// `-not_after 30200101000000Z`, and server SANs
137+
// `DNS:localhost,IP:127.0.0.1`, then base64-encode the DER outputs.
138+
const TEST_CA_CERT_DER_BASE64: &str = concat!(
139+
"MIIBizCCATGgAwIBAgICEAEwCgYIKoZIzj0EAwIwGzEZMBcGA1UEAwwQUXVpY2t3",
140+
"aXQgVGVzdCBDQTAgFw0yMDAxMDEwMDAwMDBaGA8zMDIwMDEwMTAwMDAwMFowGzEZ",
141+
"MBcGA1UEAwwQUXVpY2t3aXQgVGVzdCBDQTBZMBMGByqGSM49AgEGCCqGSM49AwEH",
142+
"A0IABH+1ZvivhT0E5FydtoMGBkyenql8XPyFTPBhTfHycTjfTWJiETjILGadPLKY",
143+
"OZJky8ThPZUpKAux5M4SaazdX1WjYzBhMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0P",
144+
"AQH/BAQDAgEGMB0GA1UdDgQWBBQmOMvIHAegmBHwvdVGyguC/57/4zAfBgNVHSME",
145+
"GDAWgBQmOMvIHAegmBHwvdVGyguC/57/4zAKBggqhkjOPQQDAgNIADBFAiEAnE7M",
146+
"lcB35MOr+7WKDAhu/c6ZrpgRz+chqqfc3g5YTOECIEDmoPkOigkulNON67opCPaT",
147+
"y+MQhMA9KDEzE3t/CY9V",
148+
);
149+
150+
const TEST_SERVER_CERT_DER_BASE64: &str = concat!(
151+
"MIIBszCCAVqgAwIBAgICEAIwCgYIKoZIzj0EAwIwGzEZMBcGA1UEAwwQUXVpY2t3",
152+
"aXQgVGVzdCBDQTAgFw0yMDAxMDEwMDAwMDBaGA8zMDIwMDEwMTAwMDAwMFowFDES",
153+
"MBAGA1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEowPj",
154+
"3vpXPAkf04MNeGhaDBvtwMsmeipV57lSWx5K2FwXH7JDmt74k4HmQFB6JESy6FbM",
155+
"tAVhivr7kG5dWKK/sqOBkjCBjzAMBgNVHRMBAf8EAjAAMA4GA1UdDwEB/wQEAwIH",
156+
"gDATBgNVHSUEDDAKBggrBgEFBQcDATAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8A",
157+
"AAEwHQYDVR0OBBYEFES7BP5uQpa3+PktDVlgc9zYGIqDMB8GA1UdIwQYMBaAFCY4",
158+
"y8gcB6CYEfC91UbKC4L/nv/jMAoGCCqGSM49BAMCA0cAMEQCIGtIKEWRn7ec82TY",
159+
"s1jrUoKWnhzRDbZTUtvXORk190rHAiAosxVgu45TjDyuROKU39TxJ1z+JObhNGk8",
160+
"J6PkuOTFqg==",
161+
);
162+
163+
const TEST_SERVER_KEY_DER_BASE64: &str = concat!(
164+
"MHcCAQEEIIql19flBaZJE16Ivs8GjdJHedhuU5YFZgvIn4WaOs6HoAoGCCqGSM49",
165+
"AwEHoUQDQgAEowPj3vpXPAkf04MNeGhaDBvtwMsmeipV57lSWx5K2FwXH7JDmt74",
166+
"k4HmQFB6JESy6FbMtAVhivr7kG5dWKK/sg==",
167+
);
117168

118-
use super::parse_google_uri;
169+
type LocalHttpsGcsServer = (String, JoinHandle<anyhow::Result<()>>);
119170

120171
#[test]
121172
fn test_parse_google_uri() {
@@ -134,4 +185,123 @@ mod tests {
134185
assert_eq!(bucket, "test-bucket");
135186
assert_eq!(prefix.to_str().unwrap(), "indexes");
136187
}
188+
189+
#[tokio::test]
190+
async fn test_gcs_storage_get_slice_over_https_with_verified_tls() -> anyhow::Result<()> {
191+
let (endpoint, server_task) = start_local_https_gcs_server().await?;
192+
let ca_cert_der = decode_test_der(TEST_CA_CERT_DER_BASE64)?;
193+
let ca_cert = reqwest_013::Certificate::from_der(&ca_cert_der)?;
194+
let reqwest_client = reqwest_013::Client::builder()
195+
.no_proxy()
196+
.tls_certs_only([ca_cert])
197+
.build()?;
198+
199+
let cfg = opendal::services::Gcs::default()
200+
.bucket("quickwit-test-bucket")
201+
.endpoint(&endpoint)
202+
.allow_anonymous()
203+
.disable_config_load()
204+
.disable_vm_metadata();
205+
let storage = OpendalStorage::new_google_cloud_storage_with_http_client_for_test(
206+
Uri::for_test("gs://quickwit-test-bucket"),
207+
cfg,
208+
HttpClient::with(reqwest_client),
209+
)?;
210+
211+
let bytes = storage.get_slice(Path::new("hello.txt"), 0..2).await?;
212+
assert_eq!(bytes.as_slice(), b"ok");
213+
server_task.await??;
214+
Ok(())
215+
}
216+
217+
async fn start_local_https_gcs_server() -> anyhow::Result<LocalHttpsGcsServer> {
218+
let cert_chain = vec![CertificateDer::from(decode_test_der(
219+
TEST_SERVER_CERT_DER_BASE64,
220+
)?)];
221+
let private_key = PrivateKeyDer::Sec1(PrivateSec1KeyDer::from(decode_test_der(
222+
TEST_SERVER_KEY_DER_BASE64,
223+
)?));
224+
let tls_config = rustls::ServerConfig::builder()
225+
.with_no_client_auth()
226+
.with_single_cert(cert_chain, private_key)?;
227+
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
228+
let listener = TcpListener::bind(("127.0.0.1", 0)).await?;
229+
let endpoint = format!("https://127.0.0.1:{}", listener.local_addr()?.port());
230+
231+
let server_task = tokio::spawn(async move {
232+
let (stream, _) = listener.accept().await?;
233+
let mut stream = tls_acceptor.accept(stream).await?;
234+
let mut request = Vec::new();
235+
let mut buffer = [0u8; 1024];
236+
loop {
237+
let bytes_read = stream.read(&mut buffer).await?;
238+
if bytes_read == 0 {
239+
break;
240+
}
241+
request.extend_from_slice(&buffer[..bytes_read]);
242+
if request.windows(4).any(|window| window == b"\r\n\r\n") {
243+
break;
244+
}
245+
}
246+
let request = String::from_utf8_lossy(&request);
247+
let (header_block, _) = request
248+
.split_once("\r\n\r\n")
249+
.expect("request must contain HTTP header terminator");
250+
let mut header_lines = header_block.lines();
251+
let request_line = header_lines.next().expect("request line must be present");
252+
let mut request_line_parts = request_line.split_whitespace();
253+
let method = request_line_parts.next();
254+
let target = request_line_parts.next();
255+
let version = request_line_parts.next();
256+
assert_eq!(
257+
method,
258+
Some("GET"),
259+
"unexpected request line: {request_line}"
260+
);
261+
assert_eq!(
262+
target,
263+
Some("/storage/v1/b/quickwit-test-bucket/o/hello.txt?alt=media"),
264+
"unexpected GCS request target: {request_line}"
265+
);
266+
assert!(
267+
version.is_some_and(|version| version.starts_with("HTTP/")),
268+
"unexpected HTTP version in request line: {request_line}"
269+
);
270+
assert_eq!(
271+
request_line_parts.next(),
272+
None,
273+
"unexpected extra request line segment: {request_line}"
274+
);
275+
276+
let headers: BTreeMap<String, String> = header_lines
277+
.filter_map(|line| line.split_once(':'))
278+
.map(|(name, value)| (name.to_ascii_lowercase(), value.trim().to_string()))
279+
.collect();
280+
assert_eq!(
281+
headers.get("range").map(String::as_str),
282+
Some("bytes=0-1"),
283+
"expected range read request header: {request}"
284+
);
285+
286+
stream
287+
.write_all(
288+
b"HTTP/1.1 206 Partial Content\r\n\
289+
Content-Length: 2\r\n\
290+
Content-Range: bytes 0-1/2\r\n\
291+
Accept-Ranges: bytes\r\n\
292+
Connection: close\r\n\
293+
\r\n\
294+
ok",
295+
)
296+
.await?;
297+
stream.shutdown().await?;
298+
Ok(())
299+
});
300+
301+
Ok((endpoint, server_task))
302+
}
303+
304+
fn decode_test_der(base64_der: &str) -> anyhow::Result<Vec<u8>> {
305+
Ok(base64::engine::general_purpose::STANDARD.decode(base64_der)?)
306+
}
137307
}

0 commit comments

Comments
 (0)