@@ -11,16 +11,16 @@ use graph::components::subgraph::Settings;
1111use graph:: data:: subgraph:: schema:: DeploymentCreate ;
1212use graph:: data:: subgraph:: Graft ;
1313use graph:: data:: value:: Word ;
14- use graph:: futures01;
15- use graph:: futures01:: future;
1614use graph:: futures01:: stream;
1715use graph:: futures01:: Future ;
1816use graph:: futures01:: Stream ;
17+ use graph:: futures03;
1918use graph:: futures03:: compat:: Future01CompatExt ;
2019use graph:: futures03:: compat:: Stream01CompatExt ;
2120use graph:: futures03:: future:: FutureExt ;
2221use graph:: futures03:: future:: TryFutureExt ;
2322use graph:: futures03:: stream:: TryStreamExt ;
23+ use graph:: futures03:: StreamExt ;
2424use graph:: prelude:: {
2525 CreateSubgraphResult , SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait ,
2626 SubgraphRegistrar as SubgraphRegistrarTrait , * ,
8080 }
8181 }
8282
83- pub fn start ( & self ) -> impl Future < Item = ( ) , Error = Error > {
83+ pub async fn start ( self : Arc < Self > ) -> Result < ( ) , Error > {
8484 let logger_clone1 = self . logger . clone ( ) ;
8585 let logger_clone2 = self . logger . clone ( ) ;
8686 let provider = self . provider . clone ( ) ;
@@ -113,37 +113,37 @@ where
113113 let assignment_event_stream = self . assignment_events ( ) ;
114114
115115 // Deploy named subgraphs found in store
116- self . start_assigned_subgraphs ( ) . and_then ( move |( ) | {
117- // Spawn a task to handle assignment events.
118- // Blocking due to store interactions. Won't be blocking after #905.
119- graph:: spawn_blocking (
120- assignment_event_stream
121- . compat ( )
122- . map_err ( SubgraphAssignmentProviderError :: Unknown )
123- . cancelable ( & assignment_event_stream_cancel_handle)
116+ self . start_assigned_subgraphs ( ) . await ?;
117+
118+ // Spawn a task to handle assignment events.
119+ // Blocking due to store interactions. Won't be blocking after #905.
120+ graph:: spawn_blocking (
121+ assignment_event_stream
122+ . compat ( )
123+ . map_err ( SubgraphAssignmentProviderError :: Unknown )
124+ . cancelable ( & assignment_event_stream_cancel_handle)
125+ . compat ( )
126+ . for_each ( move |assignment_event| {
127+ assert_eq ! ( assignment_event. node_id( ) , & node_id) ;
128+ handle_assignment_event (
129+ assignment_event,
130+ provider. clone ( ) ,
131+ logger_clone1. clone ( ) ,
132+ )
133+ . boxed ( )
124134 . compat ( )
125- . for_each ( move |assignment_event| {
126- assert_eq ! ( assignment_event. node_id( ) , & node_id) ;
127- handle_assignment_event (
128- assignment_event,
129- provider. clone ( ) ,
130- logger_clone1. clone ( ) ,
131- )
132- . boxed ( )
133- . compat ( )
134- } )
135- . map_err ( move |e| match e {
136- CancelableError :: Cancel => panic ! ( "assignment event stream canceled" ) ,
137- CancelableError :: Error ( e) => {
138- error ! ( logger_clone2, "Assignment event stream failed: {}" , e) ;
139- panic ! ( "assignment event stream failed: {}" , e) ;
140- }
141- } )
142- . compat ( ) ,
143- ) ;
135+ } )
136+ . map_err ( move |e| match e {
137+ CancelableError :: Cancel => panic ! ( "assignment event stream canceled" ) ,
138+ CancelableError :: Error ( e) => {
139+ error ! ( logger_clone2, "Assignment event stream failed: {}" , e) ;
140+ panic ! ( "assignment event stream failed: {}" , e) ;
141+ }
142+ } )
143+ . compat ( ) ,
144+ ) ;
144145
145- Ok ( ( ) )
146- } )
146+ Ok ( ( ) )
147147 }
148148
149149 pub fn assignment_events ( & self ) -> impl Stream < Item = AssignmentEvent , Error = Error > + Send {
@@ -220,36 +220,33 @@ where
220220 . flatten ( )
221221 }
222222
223- fn start_assigned_subgraphs ( & self ) -> impl Future < Item = ( ) , Error = Error > {
223+ async fn start_assigned_subgraphs ( & self ) -> Result < ( ) , Error > {
224224 let provider = self . provider . clone ( ) ;
225225 let logger = self . logger . clone ( ) ;
226226 let node_id = self . node_id . clone ( ) ;
227227
228- future:: result ( self . store . active_assignments ( & self . node_id ) )
229- . map_err ( |e| anyhow ! ( "Error querying subgraph assignments: {}" , e) )
230- . and_then ( move |deployments| {
231- // This operation should finish only after all subgraphs are
232- // started. We wait for the spawned tasks to complete by giving
233- // each a `sender` and waiting for all of them to be dropped, so
234- // the receiver terminates without receiving anything.
235- let deployments = HashSet :: < DeploymentLocator > :: from_iter ( deployments) ;
236- let deployments_len = deployments. len ( ) ;
237- let ( sender, receiver) = futures01:: sync:: mpsc:: channel :: < ( ) > ( 1 ) ;
238- for id in deployments {
239- let sender = sender. clone ( ) ;
240- let logger = logger. clone ( ) ;
241-
242- graph:: spawn (
243- start_subgraph ( id, provider. clone ( ) , logger) . map ( move |( ) | drop ( sender) ) ,
244- ) ;
245- }
246- drop ( sender) ;
247- receiver. collect ( ) . then ( move |_| {
248- info ! ( logger, "Started all assigned subgraphs" ;
249- "count" => deployments_len, "node_id" => & node_id) ;
250- future:: ok ( ( ) )
251- } )
252- } )
228+ let deployments = self
229+ . store
230+ . active_assignments ( & self . node_id )
231+ . map_err ( |e| anyhow ! ( "Error querying subgraph assignments: {}" , e) ) ?;
232+ // This operation should finish only after all subgraphs are
233+ // started. We wait for the spawned tasks to complete by giving
234+ // each a `sender` and waiting for all of them to be dropped, so
235+ // the receiver terminates without receiving anything.
236+ let deployments = HashSet :: < DeploymentLocator > :: from_iter ( deployments) ;
237+ let deployments_len = deployments. len ( ) ;
238+ let ( sender, receiver) = futures03:: channel:: mpsc:: channel :: < ( ) > ( 1 ) ;
239+ for id in deployments {
240+ let sender = sender. clone ( ) ;
241+ let logger = logger. clone ( ) ;
242+
243+ graph:: spawn ( start_subgraph ( id, provider. clone ( ) , logger) . map ( move |( ) | drop ( sender) ) ) ;
244+ }
245+ drop ( sender) ;
246+ let _: Vec < _ > = receiver. collect ( ) . await ;
247+ info ! ( logger, "Started all assigned subgraphs" ;
248+ "count" => deployments_len, "node_id" => & node_id) ;
249+ Ok ( ( ) )
253250 }
254251}
255252
0 commit comments