@@ -13,7 +13,6 @@ use async_trait::async_trait;
1313use graph:: amp;
1414use graph:: blockchain:: block_stream:: { BlockStreamMetrics , TriggersAdapterWrapper } ;
1515use graph:: blockchain:: { Blockchain , BlockchainKind , DataSource , NodeCapabilities } ;
16- use graph:: components:: link_resolver:: LinkResolverContext ;
1716use graph:: components:: metrics:: gas:: GasMetrics ;
1817use graph:: components:: metrics:: subgraph:: DeploymentStatusMetric ;
1918use graph:: components:: store:: SourceableStore ;
@@ -30,7 +29,7 @@ use tokio::task;
3029
3130use super :: context:: OffchainMonitor ;
3231use super :: SubgraphTriggerProcessor ;
33- use crate :: subgraph:: runner:: SubgraphRunnerError ;
32+ use crate :: { subgraph:: runner:: SubgraphRunnerError , subgraph_manifest } ;
3433
3534#[ derive( Clone ) ]
3635pub struct SubgraphInstanceManager < S : SubgraphStore , AC > {
@@ -84,30 +83,22 @@ where
8483 let deployment_status_metric = deployment_status_metric. clone ( ) ;
8584
8685 async move {
87- let link_resolver = self
88- . link_resolver
89- . for_manifest ( & loc. hash . to_string ( ) )
90- . map_err ( SubgraphAssignmentProviderError :: ResolveError ) ?;
91-
92- let file_bytes = link_resolver
93- . cat (
94- & LinkResolverContext :: new ( & loc. hash , & logger) ,
95- & loc. hash . to_ipfs_link ( ) ,
96- )
97- . await
98- . map_err ( SubgraphAssignmentProviderError :: ResolveError ) ?;
99-
100- let manifest: serde_yaml:: Mapping = serde_yaml:: from_slice ( & file_bytes)
101- . map_err ( |e| SubgraphAssignmentProviderError :: ResolveError ( e. into ( ) ) ) ?;
102-
103- match BlockchainKind :: from_manifest ( & manifest) ? {
86+ let raw_manifest = subgraph_manifest:: load_raw_subgraph_manifest (
87+ & logger,
88+ & * instance_manager. subgraph_store ,
89+ & * instance_manager. link_resolver ,
90+ & loc. hash ,
91+ )
92+ . await ?;
93+
94+ match BlockchainKind :: from_manifest ( & raw_manifest) ? {
10495 BlockchainKind :: Ethereum => {
10596 let runner = instance_manager
10697 . build_subgraph_runner :: < graph_chain_ethereum:: Chain > (
10798 logger. clone ( ) ,
10899 self . env_vars . cheap_clone ( ) ,
109100 loc. clone ( ) ,
110- manifest ,
101+ raw_manifest ,
111102 stop_block,
112103 Box :: new ( SubgraphTriggerProcessor { } ) ,
113104 deployment_status_metric,
@@ -122,7 +113,7 @@ where
122113 logger. clone ( ) ,
123114 self . env_vars . cheap_clone ( ) ,
124115 loc. clone ( ) ,
125- manifest ,
116+ raw_manifest ,
126117 stop_block,
127118 Box :: new ( SubgraphTriggerProcessor { } ) ,
128119 deployment_status_metric,
@@ -137,7 +128,7 @@ where
137128 logger. clone ( ) ,
138129 self . env_vars . cheap_clone ( ) ,
139130 loc. cheap_clone ( ) ,
140- manifest ,
131+ raw_manifest ,
141132 stop_block,
142133 Box :: new ( graph_chain_substreams:: TriggerProcessor :: new (
143134 loc. clone ( ) ,
@@ -253,7 +244,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
253244 logger : Logger ,
254245 env_vars : Arc < EnvVars > ,
255246 deployment : DeploymentLocator ,
256- manifest : serde_yaml:: Mapping ,
247+ raw_manifest : serde_yaml:: Mapping ,
257248 stop_block : Option < BlockNumber > ,
258249 tp : Box < dyn TriggerProcessor < C , RuntimeHostBuilder < C > > > ,
259250 deployment_status_metric : DeploymentStatusMetric ,
@@ -266,7 +257,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
266257 logger,
267258 env_vars,
268259 deployment,
269- manifest ,
260+ raw_manifest ,
270261 stop_block,
271262 tp,
272263 deployment_status_metric,
@@ -280,7 +271,7 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
280271 logger : Logger ,
281272 env_vars : Arc < EnvVars > ,
282273 deployment : DeploymentLocator ,
283- manifest : serde_yaml:: Mapping ,
274+ raw_manifest : serde_yaml:: Mapping ,
284275 stop_block : Option < BlockNumber > ,
285276 tp : Box < dyn TriggerProcessor < C , RuntimeHostBuilder < C > > > ,
286277 deployment_status_metric : DeploymentStatusMetric ,
@@ -293,8 +284,8 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
293284 let subgraph_store = self . subgraph_store . cheap_clone ( ) ;
294285 let registry = self . metrics_registry . cheap_clone ( ) ;
295286
296- let raw_yaml = serde_yaml :: to_string ( & manifest ) . unwrap ( ) ;
297- let manifest = UnresolvedSubgraphManifest :: parse ( deployment. hash . cheap_clone ( ) , manifest ) ?;
287+ let manifest =
288+ UnresolvedSubgraphManifest :: parse ( deployment. hash . cheap_clone ( ) , raw_manifest ) ?;
298289
299290 // Allow for infinite retries for subgraph definition files.
300291 let link_resolver = Arc :: from (
@@ -304,24 +295,16 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
304295 . with_retries ( ) ,
305296 ) ;
306297
307- // Make sure the `raw_yaml` is present on both this subgraph and the graft base.
308- self . subgraph_store
309- . set_manifest_raw_yaml ( & deployment. hash , raw_yaml)
310- . await ?;
311298 if let Some ( graft) = & manifest. graft {
312299 if self . subgraph_store . is_deployed ( & graft. base ) . await ? {
313- let file_bytes = self
314- . link_resolver
315- . cat (
316- & LinkResolverContext :: new ( & deployment. hash , & logger) ,
317- & graft. base . to_ipfs_link ( ) ,
318- )
319- . await ?;
320- let yaml = String :: from_utf8 ( file_bytes) ?;
321-
322- self . subgraph_store
323- . set_manifest_raw_yaml ( & graft. base , yaml)
324- . await ?;
300+ // Makes sure the raw manifest is cached in the subgraph store
301+ let _raw_manifest = subgraph_manifest:: load_raw_subgraph_manifest (
302+ & logger,
303+ & * self . subgraph_store ,
304+ & * self . link_resolver ,
305+ & graft. base ,
306+ )
307+ . await ?;
325308 }
326309 }
327310
0 commit comments