Skip to content

Commit 197bdad

Browse files
authored
[tests] Stream p3 http request bodies concurrently (#13408)
Move the P3 HTTP fixture helper's request body write/drop path into the outer join with client::send and the transmit future. This avoids requiring client::send to complete before the request body producer is driven to eof, which can deadlock with servers that read the full request body before sending response headers.
1 parent b59a9b2 commit 197bdad

1 file changed

Lines changed: 19 additions & 24 deletions

File tree

crates/test-programs/src/p3/http.rs

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -88,37 +88,32 @@ pub async fn request(
8888
.set_path_with_query(Some(&path_with_query))
8989
.map_err(|()| anyhow!("failed to set path_with_query"))?;
9090

91-
let (transmit, handle) = join!(
91+
let (transmit, (), handle) = join!(
9292
async { transmit.await.context("failed to transmit request") },
93+
async {
94+
if let Some(buf) = body {
95+
let remaining = contents_tx.write_all(buf.into()).await;
96+
assert!(remaining.is_empty());
97+
}
98+
drop(contents_tx);
99+
// This can fail in HTTP/1.1, since the connection might already be closed
100+
_ = trailers_tx.write(Ok(None)).await;
101+
},
93102
async {
94103
let response = client::send(request).await?;
95104
let status = response.get_status_code();
96105
let headers = response.get_headers().copy_all();
97106
let (_, result_rx) = wit_future::new(|| Ok(()));
98107
let (body_rx, trailers_rx) = types::Response::consume_body(response, result_rx);
99-
let ((), rx) = join!(
100-
async {
101-
if let Some(buf) = body {
102-
let remaining = contents_tx.write_all(buf.into()).await;
103-
assert!(remaining.is_empty());
104-
}
105-
drop(contents_tx);
106-
// This can fail in HTTP/1.1, since the connection might already be closed
107-
_ = trailers_tx.write(Ok(None)).await;
108-
},
109-
async {
110-
let body = body_rx.collect().await;
111-
let trailers = trailers_rx.await.context("failed to read body")?;
112-
let trailers = trailers.map(|trailers| trailers.copy_all());
113-
anyhow::Ok(Response {
114-
status,
115-
headers,
116-
body,
117-
trailers,
118-
})
119-
}
120-
);
121-
rx
108+
let body = body_rx.collect().await;
109+
let trailers = trailers_rx.await.context("failed to read body")?;
110+
let trailers = trailers.map(|trailers| trailers.copy_all());
111+
anyhow::Ok(Response {
112+
status,
113+
headers,
114+
body,
115+
trailers,
116+
})
122117
},
123118
);
124119
let response = handle?;

0 commit comments

Comments
 (0)