Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions quickwit/quickwit-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ reqwest = { workspace = true, optional = true }
http = { workspace = true }
mockall = { workspace = true }
proptest = { workspace = true }
# Match OpenDAL's internal reqwest major. `default-features = false` is
# intentional: the HTTPS regression test should get TLS only from OpenDAL's
# `reqwest-rustls-tls` feature, not from this dev-dependency.
reqwest-013 = { package = "reqwest", version = "0.13", default-features = false }
rustls = { workspace = true }
tokio = { workspace = true }
tokio-rustls = { workspace = true }
tracing-subscriber = { workspace = true }

aws-sdk-s3 = { workspace = true }
Expand Down
22 changes: 20 additions & 2 deletions quickwit/quickwit-storage/src/opendal_storage/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,30 @@ impl OpendalStorage {
cfg: opendal::services::Gcs,
) -> Result<Self, StorageResolverError> {
let op = Operator::new(cfg)?.finish();
Ok(Self {
Ok(Self::from_operator(uri, op))
}

fn from_operator(uri: Uri, op: Operator) -> Self {
Self {
uri,
op,
// limits are the same as on S3
multipart_policy: MultiPartPolicy::default(),
})
}
}

#[cfg(test)]
// Lets local HTTPS tests trust a private CA without changing global trust,
// while still using Quickwit's GCS storage construction and read path.
pub(super) fn new_google_cloud_storage_with_http_client_for_test(
uri: Uri,
cfg: opendal::services::Gcs,
http_client: opendal::raw::HttpClient,
) -> Result<Self, StorageResolverError> {
let op = Operator::new(cfg)?
.layer(opendal::layers::HttpClientLayer::new(http_client))
.finish();
Ok(Self::from_operator(uri, op))
}

#[cfg(feature = "integration-testsuite")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,58 @@ fn parse_google_uri(uri: &Uri) -> Option<(String, PathBuf)> {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::Arc;

use base64::Engine;
use opendal::raw::HttpClient;
use quickwit_common::uri::Uri;
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivateSec1KeyDer};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::task::JoinHandle;

use super::{OpendalStorage, parse_google_uri};
use crate::Storage;

// Test-only CA and server certificate for 127.0.0.1/localhost, valid from
// 2020 to 3020. The client trusts only this CA, so the test never depends
// on the host root store.
// Regenerate with:
// `bash -c 'set -euo pipefail; d=$(mktemp -d); trap "rm -rf $d" EXIT; openssl req -x509 -newkey ec -pkeyopt ec_paramgen_curve:prime256v1 -nodes -subj "/CN=Quickwit Test CA" -set_serial 0x1001 -not_before 20200101000000Z -not_after 30200101000000Z -extensions v3_ca -config <(printf "%s\n" "[req]" "distinguished_name=dn" "[dn]" "[v3_ca]" "basicConstraints=critical,CA:true" "keyUsage=critical,keyCertSign,cRLSign" "subjectKeyIdentifier=hash" "authorityKeyIdentifier=keyid:always") -keyout "$d/ca.key" -out "$d/ca.crt" 2>/dev/null; openssl req -x509 -newkey ec -pkeyopt ec_paramgen_curve:prime256v1 -nodes -subj "/CN=localhost" -CA "$d/ca.crt" -CAkey "$d/ca.key" -set_serial 0x1002 -not_before 20200101000000Z -not_after 30200101000000Z -extensions v3_server -config <(printf "%s\n" "[req]" "distinguished_name=dn" "[dn]" "[v3_server]" "basicConstraints=critical,CA:false" "keyUsage=critical,digitalSignature" "extendedKeyUsage=serverAuth" "subjectAltName=DNS:localhost,IP:127.0.0.1" "subjectKeyIdentifier=hash" "authorityKeyIdentifier=keyid,issuer") -keyout "$d/server.key" -out "$d/server.crt" 2>/dev/null; printf "CA_DER="; openssl x509 -in "$d/ca.crt" -outform der | base64 | tr -d "\n"; printf "\nSERVER_DER="; openssl x509 -in "$d/server.crt" -outform der | base64 | tr -d "\n"; printf "\nSERVER_KEY_DER="; openssl ec -in "$d/server.key" -outform der 2>/dev/null | base64 | tr -d "\n"; printf "\n"'`
const TEST_CA_CERT_DER_BASE64: &str = concat!(
"MIIBizCCATGgAwIBAgICEAEwCgYIKoZIzj0EAwIwGzEZMBcGA1UEAwwQUXVpY2t3",
"aXQgVGVzdCBDQTAgFw0yMDAxMDEwMDAwMDBaGA8zMDIwMDEwMTAwMDAwMFowGzEZ",
"MBcGA1UEAwwQUXVpY2t3aXQgVGVzdCBDQTBZMBMGByqGSM49AgEGCCqGSM49AwEH",
"A0IABH+1ZvivhT0E5FydtoMGBkyenql8XPyFTPBhTfHycTjfTWJiETjILGadPLKY",
"OZJky8ThPZUpKAux5M4SaazdX1WjYzBhMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0P",
"AQH/BAQDAgEGMB0GA1UdDgQWBBQmOMvIHAegmBHwvdVGyguC/57/4zAfBgNVHSME",
"GDAWgBQmOMvIHAegmBHwvdVGyguC/57/4zAKBggqhkjOPQQDAgNIADBFAiEAnE7M",
"lcB35MOr+7WKDAhu/c6ZrpgRz+chqqfc3g5YTOECIEDmoPkOigkulNON67opCPaT",
"y+MQhMA9KDEzE3t/CY9V",
);

const TEST_SERVER_CERT_DER_BASE64: &str = concat!(
"MIIBszCCAVqgAwIBAgICEAIwCgYIKoZIzj0EAwIwGzEZMBcGA1UEAwwQUXVpY2t3",
"aXQgVGVzdCBDQTAgFw0yMDAxMDEwMDAwMDBaGA8zMDIwMDEwMTAwMDAwMFowFDES",
"MBAGA1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEowPj",
"3vpXPAkf04MNeGhaDBvtwMsmeipV57lSWx5K2FwXH7JDmt74k4HmQFB6JESy6FbM",
"tAVhivr7kG5dWKK/sqOBkjCBjzAMBgNVHRMBAf8EAjAAMA4GA1UdDwEB/wQEAwIH",
"gDATBgNVHSUEDDAKBggrBgEFBQcDATAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8A",
"AAEwHQYDVR0OBBYEFES7BP5uQpa3+PktDVlgc9zYGIqDMB8GA1UdIwQYMBaAFCY4",
"y8gcB6CYEfC91UbKC4L/nv/jMAoGCCqGSM49BAMCA0cAMEQCIGtIKEWRn7ec82TY",
"s1jrUoKWnhzRDbZTUtvXORk190rHAiAosxVgu45TjDyuROKU39TxJ1z+JObhNGk8",
"J6PkuOTFqg==",
);

const TEST_SERVER_KEY_DER_BASE64: &str = concat!(
"MHcCAQEEIIql19flBaZJE16Ivs8GjdJHedhuU5YFZgvIn4WaOs6HoAoGCCqGSM49",
"AwEHoUQDQgAEowPj3vpXPAkf04MNeGhaDBvtwMsmeipV57lSWx5K2FwXH7JDmt74",
"k4HmQFB6JESy6FbMtAVhivr7kG5dWKK/sg==",
);

use super::parse_google_uri;
type LocalHttpsGcsServer = (String, JoinHandle<anyhow::Result<()>>);

#[test]
fn test_parse_google_uri() {
Expand All @@ -134,4 +183,123 @@ mod tests {
assert_eq!(bucket, "test-bucket");
assert_eq!(prefix.to_str().unwrap(), "indexes");
}

#[tokio::test]
async fn test_gcs_storage_get_slice_over_https_with_verified_tls() -> anyhow::Result<()> {
let (endpoint, server_task) = start_local_https_gcs_server().await?;
let ca_cert_der = decode_test_der(TEST_CA_CERT_DER_BASE64)?;
let ca_cert = reqwest_013::Certificate::from_der(&ca_cert_der)?;
let reqwest_client = reqwest_013::Client::builder()
.no_proxy()
.tls_certs_only([ca_cert])
.build()?;

let cfg = opendal::services::Gcs::default()
.bucket("quickwit-test-bucket")
.endpoint(&endpoint)
.allow_anonymous()
.disable_config_load()
.disable_vm_metadata();
let storage = OpendalStorage::new_google_cloud_storage_with_http_client_for_test(
Uri::for_test("gs://quickwit-test-bucket"),
cfg,
HttpClient::with(reqwest_client),
)?;

let bytes = storage.get_slice(Path::new("hello.txt"), 0..2).await?;
assert_eq!(bytes.as_slice(), b"ok");
server_task.await??;
Ok(())
}

async fn start_local_https_gcs_server() -> anyhow::Result<LocalHttpsGcsServer> {
let cert_chain = vec![CertificateDer::from(decode_test_der(
TEST_SERVER_CERT_DER_BASE64,
)?)];
let private_key = PrivateKeyDer::Sec1(PrivateSec1KeyDer::from(decode_test_der(
TEST_SERVER_KEY_DER_BASE64,
)?));
let tls_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(cert_chain, private_key)?;
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config));
let listener = TcpListener::bind(("127.0.0.1", 0)).await?;
let endpoint = format!("https://127.0.0.1:{}", listener.local_addr()?.port());

let server_task = tokio::spawn(async move {
let (stream, _) = listener.accept().await?;
let mut stream = tls_acceptor.accept(stream).await?;
let mut request = Vec::new();
let mut buffer = [0u8; 1024];
loop {
let bytes_read = stream.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
request.extend_from_slice(&buffer[..bytes_read]);
if request.windows(4).any(|window| window == b"\r\n\r\n") {
break;
}
}
let request = String::from_utf8_lossy(&request);
let (header_block, _) = request
.split_once("\r\n\r\n")
.expect("request must contain HTTP header terminator");
let mut header_lines = header_block.lines();
let request_line = header_lines.next().expect("request line must be present");
let mut request_line_parts = request_line.split_whitespace();
let method = request_line_parts.next();
let target = request_line_parts.next();
let version = request_line_parts.next();
assert_eq!(
method,
Some("GET"),
"unexpected request line: {request_line}"
);
assert_eq!(
target,
Some("/storage/v1/b/quickwit-test-bucket/o/hello.txt?alt=media"),
"unexpected GCS request target: {request_line}"
);
assert!(
version.is_some_and(|version| version.starts_with("HTTP/")),
"unexpected HTTP version in request line: {request_line}"
);
assert_eq!(
request_line_parts.next(),
None,
"unexpected extra request line segment: {request_line}"
);

let headers: BTreeMap<String, String> = header_lines
.filter_map(|line| line.split_once(':'))
.map(|(name, value)| (name.to_ascii_lowercase(), value.trim().to_string()))
.collect();
assert_eq!(
headers.get("range").map(String::as_str),
Some("bytes=0-1"),
"expected range read request header: {request}"
);

stream
.write_all(
b"HTTP/1.1 206 Partial Content\r\n\
Content-Length: 2\r\n\
Content-Range: bytes 0-1/2\r\n\
Accept-Ranges: bytes\r\n\
Connection: close\r\n\
\r\n\
ok",
)
.await?;
stream.shutdown().await?;
Ok(())
});

Ok((endpoint, server_task))
}

fn decode_test_der(base64_der: &str) -> anyhow::Result<Vec<u8>> {
Ok(base64::engine::general_purpose::STANDARD.decode(base64_der)?)
}
}