Skip to content

Commit 9d9e2df

Browse files
committed
clients can use multipart too
1 parent c184767 commit 9d9e2df

4 files changed

Lines changed: 136 additions & 12 deletions

File tree

bin/router/src/pipeline/header.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ lazy_static! {
1717
pub static ref APPLICATION_GRAPHQL_RESPONSE_JSON: HeaderValue =
1818
HeaderValue::from_static(&APPLICATION_GRAPHQL_RESPONSE_JSON_STR);
1919
pub static ref TEXT_EVENT_STREAM: &'static str = "text/event-stream";
20+
pub static ref MULTIPART_MIXED: &'static str = r#"multipart/mixed;subscriptionSpec="1.0""#;
2021
}
2122

2223
pub trait RequestAccepts {

bin/router/src/pipeline/mod.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{sync::Arc, time::Instant};
22

3+
use futures_util::Stream;
34
use hive_router_plan_executor::execution::{
45
client_request_details::{ClientRequestDetails, JwtRequestDetails, OperationDetails},
56
plan::QueryPlanExecutionResult,
@@ -24,8 +25,8 @@ use crate::{
2425
execution_request::get_execution_request,
2526
header::{
2627
RequestAccepts, APPLICATION_GRAPHQL_RESPONSE_JSON,
27-
APPLICATION_GRAPHQL_RESPONSE_JSON_STR, APPLICATION_JSON, TEXT_EVENT_STREAM,
28-
TEXT_HTML_CONTENT_TYPE,
28+
APPLICATION_GRAPHQL_RESPONSE_JSON_STR, APPLICATION_JSON, MULTIPART_MIXED,
29+
TEXT_EVENT_STREAM, TEXT_HTML_CONTENT_TYPE,
2930
},
3031
normalize::{normalize_request_with_cache, GraphQLNormalizationPayload},
3132
parser::parse_operation_with_cache,
@@ -108,13 +109,32 @@ pub async fn graphql_request_handler(
108109
.body(response_bytes)
109110
}
110111
Ok(QueryPlanExecutionResult::Stream(response)) => {
111-
use crate::pipeline::sse;
112+
use crate::pipeline::{header::TEXT_EVENT_STREAM, multipart_subscribe, sse};
112113

113-
let response_content_type = http::HeaderValue::from_static("text/event-stream");
114-
let body = Box::pin(sse::create_stream(
115-
response.body,
116-
std::time::Duration::from_secs(10),
117-
));
114+
// TODO: respect order of Accept header
115+
let (response_content_type, body): (
116+
http::HeaderValue,
117+
std::pin::Pin<
118+
Box<dyn Stream<Item = Result<ntex::util::Bytes, std::io::Error>> + Send>,
119+
>,
120+
) = if req.accepts_content_type(*TEXT_EVENT_STREAM) {
121+
(
122+
http::HeaderValue::from_static("text/event-stream"),
123+
Box::pin(sse::create_stream(
124+
response.body,
125+
std::time::Duration::from_secs(10),
126+
)),
127+
)
128+
} else {
129+
// NOTE: client accept headers should have been validated earlier
130+
(
131+
http::HeaderValue::from_static("multipart/mixed;boundary=graphql"),
132+
Box::pin(multipart_subscribe::create_stream(
133+
response.body,
134+
std::time::Duration::from_secs(10),
135+
)),
136+
)
137+
};
118138

119139
let mut response_builder = web::HttpResponse::Ok();
120140
for (header_name, header_value) in response.headers {
@@ -162,7 +182,10 @@ pub async fn execute_pipeline(
162182
Some(OperationKind::Subscription)
163183
);
164184

165-
if is_subscription && !req.accepts_content_type(*TEXT_EVENT_STREAM) {
185+
if is_subscription
186+
&& !req.accepts_content_type(*MULTIPART_MIXED)
187+
&& !req.accepts_content_type(*TEXT_EVENT_STREAM)
188+
{
166189
return Err(
167190
req.new_pipeline_error(PipelineErrorVariant::SubscriptionNotSupportedOverTransport)
168191
);

e2e/src/subscriptions.rs

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ mod subscription_e2e_tests {
2020
}
2121

2222
#[ntex::test]
23-
async fn subscription_no_entity_resolution_sse() {
23+
async fn subscription_no_entity_resolution_sse_subgraph() {
2424
let _subgraphs_server = SubgraphsServer::start_with_subscriptions_protocol(
2525
subgraphs::SubscriptionProtocol::SseOnly,
2626
)
@@ -107,7 +107,7 @@ mod subscription_e2e_tests {
107107
}
108108

109109
#[ntex::test]
110-
async fn subscription_no_entity_resolution_multipart() {
110+
async fn subscription_no_entity_resolution_multipart_subgraph() {
111111
let _subgraphs_server = SubgraphsServer::start_with_subscriptions_protocol(
112112
subgraphs::SubscriptionProtocol::MultipartOnly,
113113
)
@@ -278,6 +278,106 @@ mod subscription_e2e_tests {
278278
);
279279
}
280280

281+
#[ntex::test]
282+
async fn subscription_yes_entity_resolution_multipart_client() {
283+
let _subgraphs_server = SubgraphsServer::start().await;
284+
285+
let router = init_router_from_config_inline(&format!(
286+
r#"
287+
supergraph:
288+
source: file
289+
path: supergraph.graphql
290+
"#
291+
))
292+
.await
293+
.unwrap();
294+
295+
wait_for_readiness(&router.app).await;
296+
297+
let req = init_graphql_request(
298+
r#"
299+
subscription {
300+
reviewAdded(intervalInMs: 0) {
301+
id
302+
product {
303+
name
304+
}
305+
}
306+
}
307+
"#,
308+
None,
309+
)
310+
.header(
311+
http::header::ACCEPT,
312+
// as per https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol#executing-a-subscription
313+
r#"multipart/mixed;subscriptionSpec="1.0", application/json"#,
314+
)
315+
.to_request();
316+
317+
let res = test::call_service(&router.app, req).await;
318+
319+
assert!(res.status().is_success(), "Expected 200 OK");
320+
321+
let content_type_header = get_content_type_header(&res);
322+
323+
let body = test::read_body(res).await;
324+
let body_str = std::str::from_utf8(&body).unwrap();
325+
326+
assert_snapshot!(body_str, @r#"
327+
--graphql
328+
Content-Type: application/json
329+
330+
{"payload":{"data":{"reviewAdded":{"id":"1","product":{"name":"Table"}}}}}
331+
--graphql
332+
Content-Type: application/json
333+
334+
{"payload":{"data":{"reviewAdded":{"id":"2","product":{"name":"Table"}}}}}
335+
--graphql
336+
Content-Type: application/json
337+
338+
{"payload":{"data":{"reviewAdded":{"id":"3","product":{"name":"Table"}}}}}
339+
--graphql
340+
Content-Type: application/json
341+
342+
{"payload":{"data":{"reviewAdded":{"id":"4","product":{"name":"Table"}}}}}
343+
--graphql
344+
Content-Type: application/json
345+
346+
{"payload":{"data":{"reviewAdded":{"id":"5","product":{"name":"Couch"}}}}}
347+
--graphql
348+
Content-Type: application/json
349+
350+
{"payload":{"data":{"reviewAdded":{"id":"6","product":{"name":"Couch"}}}}}
351+
--graphql
352+
Content-Type: application/json
353+
354+
{"payload":{"data":{"reviewAdded":{"id":"7","product":{"name":"Couch"}}}}}
355+
--graphql
356+
Content-Type: application/json
357+
358+
{"payload":{"data":{"reviewAdded":{"id":"8","product":{"name":"Couch"}}}}}
359+
--graphql
360+
Content-Type: application/json
361+
362+
{"payload":{"data":{"reviewAdded":{"id":"9","product":{"name":"Glass"}}}}}
363+
--graphql
364+
Content-Type: application/json
365+
366+
{"payload":{"data":{"reviewAdded":{"id":"10","product":{"name":"Chair"}}}}}
367+
--graphql
368+
Content-Type: application/json
369+
370+
{"payload":{"data":{"reviewAdded":{"id":"11","product":{"name":"Chair"}}}}}
371+
--graphql--
372+
"#);
373+
374+
// we check this at the end because the body will hold clues to why the test fails
375+
assert_eq!(
376+
content_type_header, "multipart/mixed;boundary=graphql",
377+
"Expected Content-Type to be multipart/mixed;boundary=graphql"
378+
);
379+
}
380+
281381
#[ntex::test]
282382
async fn subscription_entity_resolution_with_requires() {
283383
let _subgraphs_server = SubgraphsServer::start().await;

lib/executor/src/executors/http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor {
363363
headers.insert(
364364
http::header::ACCEPT,
365365
HeaderValue::from_static(
366-
r#"multipart/mixed;boundary="graphql";subscriptionSpec="1.0", text/event-stream"#,
366+
r#"multipart/mixed;subscriptionSpec="1.0", text/event-stream"#,
367367
),
368368
);
369369
*req.headers_mut() = headers;

0 commit comments

Comments
 (0)