forked from grafbase/grafbase
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsubscription.rs
More file actions
93 lines (87 loc) · 3.12 KB
/
subscription.rs
File metadata and controls
93 lines (87 loc) · 3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use futures_util::{stream::BoxStream, StreamExt};
use runtime::{fetch::GraphqlRequest, rate_limiting::RateLimitKey};
use serde::de::DeserializeSeed;
use super::{
deserialize::{GraphqlResponseSeed, RootGraphqlErrors},
variables::SubgraphVariables,
ExecutionContext, GraphqlPreparedExecutor,
};
use crate::{
execution::{ExecutionError, PlanWalker, SubscriptionResponse},
sources::ExecutionResult,
Runtime,
};
impl GraphqlPreparedExecutor {
pub async fn execute_subscription<'ctx, R: Runtime>(
&'ctx self,
ctx: ExecutionContext<'ctx, R>,
plan: PlanWalker<'ctx>,
new_response: impl Fn() -> SubscriptionResponse + Send + 'ctx,
) -> ExecutionResult<BoxStream<'ctx, ExecutionResult<SubscriptionResponse>>> {
let subgraph = ctx.schema().walk(self.subgraph_id);
let url = {
let mut url = subgraph.websocket_url().clone();
// If the user doesn't provide an explicit websocket URL we use the normal URL,
// so make sure to convert the scheme to something appropriate
match url.scheme() {
"http" => url.set_scheme("ws").expect("this to work"),
"https" => url.set_scheme("wss").expect("this to work"),
_ => {}
}
url
};
ctx.engine
.runtime
.rate_limiter()
.limit(&RateLimitKey::Subgraph(subgraph.name().into()))
.await?;
let stream = ctx
.engine
.runtime
.fetcher()
.stream(GraphqlRequest {
url: &url,
query: &self.operation.query,
variables: serde_json::to_value(&SubgraphVariables::<()> {
plan,
variables: &self.operation.variables,
inputs: Vec::new(),
})
.map_err(|error| error.to_string())?,
headers: ctx.subgraph_headers_with_rules(subgraph.header_rules()),
})
.await
.map_err(|error| ExecutionError::Fetch {
subgraph_name: subgraph.name().to_string(),
error,
})?;
Ok(Box::pin(stream.map(move |subgraph_response| {
let mut subscription_response = new_response();
ingest_response(
&mut subscription_response,
plan,
subgraph_response.map_err(|error| ExecutionError::Fetch {
subgraph_name: subgraph.name().to_string(),
error,
})?,
)?;
Ok(subscription_response)
})))
}
}
fn ingest_response(
subscription_response: &mut SubscriptionResponse,
plan: PlanWalker<'_>,
subgraph_response: serde_json::Value,
) -> ExecutionResult<()> {
let response = subscription_response.root_response();
GraphqlResponseSeed::new(
response.next_seed(plan).expect("Must have a root object to update"),
RootGraphqlErrors {
response,
response_keys: plan.response_keys(),
},
)
.deserialize(subgraph_response)?;
Ok(())
}