@@ -113,10 +113,55 @@ fn parse_google_uri(uri: &Uri) -> Option<(String, PathBuf)> {
113113
114114#[ cfg( test) ]
115115mod tests {
116+ use std:: sync:: Arc ;
117+
118+ use base64:: Engine ;
119+ use opendal:: Operator ;
120+ use opendal:: layers:: HttpClientLayer ;
121+ use opendal:: raw:: HttpClient ;
116122 use quickwit_common:: uri:: Uri ;
123+ use rustls:: pki_types:: { CertificateDer , PrivateKeyDer , PrivateSec1KeyDer } ;
124+ use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
125+ use tokio:: net:: TcpListener ;
126+ use tokio:: task:: JoinHandle ;
117127
118128 use super :: parse_google_uri;
119129
130+ // Test-only localhost certificate chain. The client below trusts only this
131+ // embedded CA, so the test never depends on the host root store.
132+ const TEST_CA_CERT_DER_BASE64 : & str = concat ! (
133+ "MIIBizCCATGgAwIBAgIURn3X2TGz13Wv1/p3x8S3y1ALQwkwCgYIKoZIzj0EAwIw" ,
134+ "GzEZMBcGA1UEAwwQUXVpY2t3aXQgVGVzdCBDQTAeFw0yNjA1MzAxMTE4NDJaFw0z" ,
135+ "NjA1MjcxMTE4NDJaMBsxGTAXBgNVBAMMEFF1aWNrd2l0IFRlc3QgQ0EwWTATBgcq" ,
136+ "hkjOPQIBBggqhkjOPQMBBwNCAATyMe2ARkjYkk+QnWZA8T+A99xLcHhMRdcMBElb" ,
137+ "NFYf2nccWkaaqEnUGOIe6Y6ODehYX5dpEWpwkXQ20p52mtDLo1MwUTAdBgNVHQ4E" ,
138+ "FgQUgwSEbLYdLqTp4uEWN3CqY4LkHgUwHwYDVR0jBBgwFoAUgwSEbLYdLqTp4uEW" ,
139+ "N3CqY4LkHgUwDwYDVR0TAQH/BAUwAwEB/zAKBggqhkjOPQQDAgNIADBFAiAvxQlF" ,
140+ "s1tG0Qs/rY9WMcKdvCiOyW044KePy4t9jzkhsgIhALBerDKsWc80bgBYWetpIZVS" ,
141+ "FdrixGuySBCdAG9w0rWn" ,
142+ ) ;
143+
144+ const TEST_SERVER_CERT_DER_BASE64 : & str = concat ! (
145+ "MIIBuDCCAV6gAwIBAgIUIwRonRLHFnBmsr8U5Ytq9LFW6ZMwCgYIKoZIzj0EAwIw" ,
146+ "GzEZMBcGA1UEAwwQUXVpY2t3aXQgVGVzdCBDQTAeFw0yNjA1MzAxMTE4NDJaFw0z" ,
147+ "NjA1MjcxMTE4NDJaMBQxEjAQBgNVBAMMCWxvY2FsaG9zdDBZMBMGByqGSM49AgEG" ,
148+ "CCqGSM49AwEHA0IABFqZwG11V1JJijKkWXhVkFOoq5/VAjkjejQb0BcQRadBQTN2" ,
149+ "yziIrxFWSgfnekyIa3j7Zfa4WIPL3LLfrjBg6xOjgYYwgYMwFAYDVR0RBA0wC4IJ" ,
150+ "bG9jYWxob3N0MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgWgMBMGA1UdJQQMMAoGCCsG" ,
151+ "AQUFBwMBMB0GA1UdDgQWBBQecwDdu1wYyfkDKm8RsEBUzl3eijAfBgNVHSMEGDAW" ,
152+ "gBSDBIRsth0upOni4RY3cKpjguQeBTAKBggqhkjOPQQDAgNIADBFAiBhbBIAKEID" ,
153+ "6F0vribdLyPkJbMoNNOSK79hdrpdnYyTZwIhALwHMlmusqbK2lzH1l6bwbpldrfb" ,
154+ "xiQgxb9SrOgsPmwz" ,
155+ ) ;
156+
157+ const TEST_SERVER_KEY_DER_BASE64 : & str = concat ! (
158+ "MHcCAQEEIB39TDCC+qOWh5pRcqaGvaAY2Zx9hYWgREB/QoFehxH2oAoGCCqGSM49" ,
159+ "AwEHoUQDQgAEWpnAbXVXUkmKMqRZeFWQU6irn9UCOSN6NBvQFxBFp0FBM3bLOIiv" ,
160+ "EVZKB+d6TIhrePtl9rhYg8vcst+uMGDrEw==" ,
161+ ) ;
162+
163+ type LocalHttpsGcsServer = ( String , JoinHandle < anyhow:: Result < ( ) > > ) ;
164+
120165 #[ test]
121166 fn test_parse_google_uri ( ) {
122167 assert ! ( parse_google_uri( & Uri :: for_test( "gs://" ) ) . is_none( ) ) ;
@@ -134,4 +179,96 @@ mod tests {
134179 assert_eq ! ( bucket, "test-bucket" ) ;
135180 assert_eq ! ( prefix. to_str( ) . unwrap( ) , "indexes" ) ;
136181 }
182+
183+ #[ tokio:: test]
184+ #[ cfg_attr( not( feature = "ci-test" ) , ignore) ]
185+ async fn test_gcs_https_endpoint_can_read_from_local_server ( ) -> anyhow:: Result < ( ) > {
186+ let ( endpoint, server_task) = start_local_https_gcs_server ( ) . await ?;
187+ let ca_cert_der = decode_test_der ( TEST_CA_CERT_DER_BASE64 ) ?;
188+ let ca_cert = reqwest_013:: Certificate :: from_der ( & ca_cert_der) ?;
189+ let reqwest_client = reqwest_013:: Client :: builder ( )
190+ . no_proxy ( )
191+ . tls_certs_only ( [ ca_cert] )
192+ . build ( ) ?;
193+
194+ let cfg = opendal:: services:: Gcs :: default ( )
195+ . bucket ( "quickwit-test-bucket" )
196+ . endpoint ( & endpoint)
197+ . allow_anonymous ( )
198+ . disable_config_load ( )
199+ . disable_vm_metadata ( ) ;
200+ let op = Operator :: new ( cfg) ?
201+ . layer ( HttpClientLayer :: new ( HttpClient :: with ( reqwest_client) ) )
202+ . finish ( ) ;
203+
204+ let bytes = op. read_with ( "hello.txt" ) . range ( 0 ..2 ) . await ?. to_bytes ( ) ;
205+ assert_eq ! ( bytes. as_ref( ) , b"ok" ) ;
206+ server_task. await ??;
207+ Ok ( ( ) )
208+ }
209+
210+ async fn start_local_https_gcs_server ( ) -> anyhow:: Result < LocalHttpsGcsServer > {
211+ let cert_chain = vec ! [ CertificateDer :: from( decode_test_der(
212+ TEST_SERVER_CERT_DER_BASE64 ,
213+ ) ?) ] ;
214+ let private_key = PrivateKeyDer :: Sec1 ( PrivateSec1KeyDer :: from ( decode_test_der (
215+ TEST_SERVER_KEY_DER_BASE64 ,
216+ ) ?) ) ;
217+ let tls_config = rustls:: ServerConfig :: builder ( )
218+ . with_no_client_auth ( )
219+ . with_single_cert ( cert_chain, private_key) ?;
220+ let tls_acceptor = tokio_rustls:: TlsAcceptor :: from ( Arc :: new ( tls_config) ) ;
221+ let listener = TcpListener :: bind ( ( "127.0.0.1" , 0 ) ) . await ?;
222+ let endpoint = format ! ( "https://localhost:{}" , listener. local_addr( ) ?. port( ) ) ;
223+
224+ let server_task = tokio:: spawn ( async move {
225+ let ( stream, _) = listener. accept ( ) . await ?;
226+ let mut stream = tls_acceptor. accept ( stream) . await ?;
227+ let mut request = Vec :: new ( ) ;
228+ let mut buffer = [ 0u8 ; 1024 ] ;
229+ loop {
230+ let bytes_read = stream. read ( & mut buffer) . await ?;
231+ if bytes_read == 0 {
232+ break ;
233+ }
234+ request. extend_from_slice ( & buffer[ ..bytes_read] ) ;
235+ if request. windows ( 4 ) . any ( |window| window == b"\r \n \r \n " ) {
236+ break ;
237+ }
238+ }
239+ let request = String :: from_utf8_lossy ( & request) ;
240+ assert ! (
241+ request. starts_with(
242+ "GET /storage/v1/b/quickwit-test-bucket/o/hello.txt?alt=media HTTP/"
243+ ) ,
244+ "unexpected GCS request line: {request}"
245+ ) ;
246+ assert ! (
247+ request
248+ . to_ascii_lowercase( )
249+ . contains( "\r \n range: bytes=0-1\r \n " ) ,
250+ "expected range read request header: {request}"
251+ ) ;
252+
253+ stream
254+ . write_all (
255+ b"HTTP/1.1 206 Partial Content\r \n \
256+ Content-Length: 2\r \n \
257+ Content-Range: bytes 0-1/2\r \n \
258+ Accept-Ranges: bytes\r \n \
259+ Connection: close\r \n \
260+ \r \n \
261+ ok",
262+ )
263+ . await ?;
264+ stream. shutdown ( ) . await ?;
265+ Ok ( ( ) )
266+ } ) ;
267+
268+ Ok ( ( endpoint, server_task) )
269+ }
270+
271+ fn decode_test_der ( base64_der : & str ) -> anyhow:: Result < Vec < u8 > > {
272+ Ok ( base64:: engine:: general_purpose:: STANDARD . decode ( base64_der) ?)
273+ }
137274}
0 commit comments