-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgateway_nanny.rs
More file actions
154 lines (127 loc) · 4.92 KB
/
gateway_nanny.rs
File metadata and controls
154 lines (127 loc) · 4.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
use std::{collections::HashMap, sync::Arc};
use crate::ConfigWatcher;
use super::bus::{EngineSender, GraphWatcher};
use engine_v2::Engine;
use futures_concurrency::stream::Merge;
use futures_util::{future::BoxFuture, stream::BoxStream, FutureExt as _, StreamExt};
use gateway_config::GraphRateLimit;
use runtime::rate_limiting::RateLimitKey;
use runtime_local::{rate_limiting::in_memory::key_based::InMemoryRateLimiter, InMemoryEntityCache};
use tokio_stream::wrappers::WatchStream;
/// The GatewayNanny looks after the `Gateway` - on updates to the graph or config it'll
/// create a new `Gateway` and publish it on the gateway channel
pub(crate) struct EngineNanny {
graph: GraphWatcher,
config: ConfigWatcher,
sender: EngineSender,
}
impl EngineNanny {
pub fn new(graph: GraphWatcher, config: ConfigWatcher, sender: EngineSender) -> Self {
Self { graph, config, sender }
}
pub async fn handler(self) {
log::trace!("starting the gateway nanny");
let streams: [BoxStream<'static, NannyMessage>; 2] = [
Box::pin(WatchStream::new(self.graph.clone()).map(|_| NannyMessage::GraphUpdated)),
Box::pin(WatchStream::new(self.config.clone()).map(|_| NannyMessage::ConfigUpdated)),
];
let mut stream = streams.merge();
while let Some(message) = stream.next().await {
log::trace!("nanny received a {message:?}");
let config = self
.graph
.borrow()
.clone()
.map(|graph| engine_config_builder::build_with_sdl_config(&self.config.borrow(), graph));
let gateway = new_gateway(config).await;
if let Err(error) = self.sender.send(gateway) {
log::error!("Couldn't publish new gateway: {error:?}");
}
}
}
}
pub(super) async fn new_gateway(config: Option<engine_v2::VersionedConfig>) -> Option<Arc<Engine<CliRuntime>>> {
let config = config?.into_latest();
let runtime = CliRuntime {
fetcher: runtime_local::NativeFetcher::runtime_fetcher(),
trusted_documents: runtime::trusted_documents_client::Client::new(
runtime_noop::trusted_documents::NoopTrustedDocuments,
),
kv: runtime_local::InMemoryKvStore::runtime(),
meter: grafbase_telemetry::metrics::meter_from_global_provider(),
// FIXME: God is this ugly
rate_limiter: InMemoryRateLimiter::runtime({
let mut key_based_config = HashMap::new();
if let Some(global_config) = config.rate_limit.as_ref().and_then(|c| c.global) {
key_based_config.insert(
RateLimitKey::Global,
GraphRateLimit {
limit: global_config.limit,
duration: global_config.duration,
},
);
}
for (subgraph_name, subgraph) in config.subgraph_configs.iter() {
if let Some(limit) = subgraph.rate_limit {
let name = &config.graph[config.graph[*subgraph_name].name];
key_based_config.insert(
RateLimitKey::Subgraph(name.clone().into()),
GraphRateLimit {
limit: limit.limit,
duration: limit.duration,
},
);
}
}
key_based_config
}),
entity_cache: InMemoryEntityCache::default(),
};
let schema = config.try_into().ok()?;
let engine = Engine::new(Arc::new(schema), None, runtime).await;
Some(Arc::new(engine))
}
pub struct CliRuntime {
fetcher: runtime::fetch::Fetcher,
trusted_documents: runtime::trusted_documents_client::Client,
kv: runtime::kv::KvStore,
meter: grafbase_telemetry::otel::opentelemetry::metrics::Meter,
rate_limiter: runtime::rate_limiting::RateLimiter,
entity_cache: InMemoryEntityCache,
}
impl engine_v2::Runtime for CliRuntime {
type Hooks = ();
type OperationCacheFactory = ();
fn fetcher(&self) -> &runtime::fetch::Fetcher {
&self.fetcher
}
fn trusted_documents(&self) -> &runtime::trusted_documents_client::Client {
&self.trusted_documents
}
fn kv(&self) -> &runtime::kv::KvStore {
&self.kv
}
fn meter(&self) -> &grafbase_telemetry::otel::opentelemetry::metrics::Meter {
&self.meter
}
fn hooks(&self) -> &() {
&()
}
fn operation_cache_factory(&self) -> &() {
&()
}
fn rate_limiter(&self) -> &runtime::rate_limiting::RateLimiter {
&self.rate_limiter
}
fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()> {
tokio::time::sleep(duration).boxed()
}
fn entity_cache(&self) -> &dyn runtime::entity_cache::EntityCache {
&self.entity_cache
}
}
#[derive(Debug)]
enum NannyMessage {
GraphUpdated,
ConfigUpdated,
}