@@ -11,15 +11,12 @@ use graph::components::subgraph::Settings;
1111use graph:: data:: subgraph:: schema:: DeploymentCreate ;
1212use graph:: data:: subgraph:: Graft ;
1313use graph:: data:: value:: Word ;
14- use graph:: futures01:: stream;
15- use graph:: futures01:: Future ;
16- use graph:: futures01:: Stream ;
1714use graph:: futures03;
18- use graph:: futures03:: compat:: Future01CompatExt ;
19- use graph:: futures03:: compat:: Stream01CompatExt ;
2015use graph:: futures03:: future:: FutureExt ;
2116use graph:: futures03:: future:: TryFutureExt ;
17+ use graph:: futures03:: stream;
2218use graph:: futures03:: stream:: TryStreamExt ;
19+ use graph:: futures03:: Stream ;
2320use graph:: futures03:: StreamExt ;
2421use graph:: prelude:: {
2522 CreateSubgraphResult , SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait ,
8178 }
8279
8380 pub async fn start ( self : Arc < Self > ) -> Result < ( ) , Error > {
84- let logger_clone1 = self . logger . clone ( ) ;
85- let logger_clone2 = self . logger . clone ( ) ;
86- let provider = self . provider . clone ( ) ;
87- let node_id = self . node_id . clone ( ) ;
88- let assignment_event_stream_cancel_handle =
89- self . assignment_event_stream_cancel_guard . handle ( ) ;
90-
9181 // The order of the following three steps is important:
9282 // - Start assignment event stream
9383 // - Read assignments table and start assigned subgraphs
@@ -109,114 +99,138 @@ where
10999 // The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning
110100 // (on subgraph start) which makes the operations idempotent. Subgraph stop is already idempotent.
111101
102+ fn panic_on_cancel (
103+ logger : & Logger ,
104+ e : CancelableError < SubgraphAssignmentProviderError > ,
105+ ) -> ! {
106+ match e {
107+ CancelableError :: Cancel => {
108+ panic ! ( "assignment event stream canceled" )
109+ }
110+ CancelableError :: Error ( e) => {
111+ error ! ( logger, "Assignment event stream failed: {}" , e) ;
112+ panic ! ( "assignment event stream failed: {}" , e) ;
113+ }
114+ }
115+ }
116+
112117 // Start event stream
113- let assignment_event_stream = self . assignment_events ( ) ;
118+ let assignment_event_stream = self . cheap_clone ( ) . assignment_events ( ) . await ;
114119
115120 // Deploy named subgraphs found in store
116121 self . start_assigned_subgraphs ( ) . await ?;
117122
118123 // Spawn a task to handle assignment events.
119124 // Blocking due to store interactions. Won't be blocking after #905.
120- graph:: spawn_blocking (
121- assignment_event_stream
122- . compat ( )
123- . map_err ( SubgraphAssignmentProviderError :: Unknown )
125+ let assignment_event_stream_cancel_handle =
126+ self . assignment_event_stream_cancel_guard . handle ( ) ;
127+
128+ let fut =
129+ Box :: pin ( assignment_event_stream. map_err ( SubgraphAssignmentProviderError :: Unknown ) )
124130 . cancelable ( & assignment_event_stream_cancel_handle)
125- . compat ( )
126- . for_each ( move |assignment_event | {
127- assert_eq ! ( assignment_event . node_id ( ) , & node_id ) ;
128- handle_assignment_event (
129- assignment_event ,
130- provider . clone ( ) ,
131- logger_clone1 . clone ( ) ,
132- )
133- . boxed ( )
134- . compat ( )
135- } )
136- . map_err ( move |e| match e {
137- CancelableError :: Cancel => panic ! ( "assignment event stream canceled" ) ,
138- CancelableError :: Error ( e ) => {
139- error ! ( logger_clone2 , "Assignment event stream failed: {}" , e ) ;
140- panic ! ( "assignment event stream failed: {}" , e ) ;
131+ . for_each ( {
132+ move |event | {
133+ let this = self . cheap_clone ( ) ;
134+ let provider = self . provider . clone ( ) ;
135+ async move {
136+ if let Err ( e ) = match event {
137+ Ok ( event ) => {
138+ assert_eq ! ( event . node_id ( ) , & this . node_id ) ;
139+ handle_assignment_event ( event , provider . clone ( ) , & this . logger )
140+ . await
141+ }
142+ Err ( e ) => Err ( e ) ,
143+ } {
144+ panic_on_cancel ( & this . logger , e ) ;
145+ } ;
146+ }
141147 }
142- } )
143- . compat ( ) ,
144- ) ;
148+ } ) ;
145149
150+ graph:: spawn_blocking ( fut) ;
146151 Ok ( ( ) )
147152 }
148153
149- pub fn assignment_events ( & self ) -> impl Stream < Item = AssignmentEvent , Error = Error > + Send {
150- let store = self . store . clone ( ) ;
151- let node_id = self . node_id . clone ( ) ;
152- let logger = self . logger . clone ( ) ;
154+ /// Maps an assignment change to an assignment event by checking the
155+ /// current state in the database, ignoring changes that do not affect
156+ /// this node or do not require anything to change.
157+ fn map_assignment ( & self , change : AssignmentChange ) -> Result < Option < AssignmentEvent > , Error > {
158+ let ( deployment, operation) = change. into_parts ( ) ;
153159
160+ trace ! ( self . logger, "Received assignment change" ;
161+ "deployment" => %deployment,
162+ "operation" => format!( "{:?}" , operation) ,
163+ ) ;
164+
165+ match operation {
166+ AssignmentOperation :: Set => {
167+ let assigned = self
168+ . store
169+ . assignment_status ( & deployment)
170+ . map_err ( |e| anyhow ! ( "Failed to get subgraph assignment entity: {}" , e) ) ?;
171+
172+ let logger = self . logger . new ( o ! ( "subgraph_id" => deployment. hash. to_string( ) , "node_id" => self . node_id. to_string( ) ) ) ;
173+ if let Some ( ( assigned, is_paused) ) = assigned {
174+ if & assigned == & self . node_id {
175+ if is_paused {
176+ // Subgraph is paused, so we don't start it
177+ debug ! ( logger, "Deployment assignee is this node" ; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore" ) ;
178+ return Ok ( None ) ;
179+ }
180+
181+ // Start subgraph on this node
182+ debug ! ( logger, "Deployment assignee is this node" ; "assigned_to" => assigned, "action" => "add" ) ;
183+ Ok ( Some ( AssignmentEvent :: Add {
184+ deployment,
185+ node_id : self . node_id . clone ( ) ,
186+ } ) )
187+ } else {
188+ // Ensure it is removed from this node
189+ debug ! ( logger, "Deployment assignee is not this node" ; "assigned_to" => assigned, "action" => "remove" ) ;
190+ Ok ( Some ( AssignmentEvent :: Remove {
191+ deployment,
192+ node_id : self . node_id . clone ( ) ,
193+ } ) )
194+ }
195+ } else {
196+ // Was added/updated, but is now gone.
197+ debug ! ( self . logger, "Deployment assignee not found in database" ; "action" => "ignore" ) ;
198+ Ok ( None )
199+ }
200+ }
201+ AssignmentOperation :: Removed => {
202+ // Send remove event without checking node ID.
203+ // If node ID does not match, then this is a no-op when handled in
204+ // assignment provider.
205+ Ok ( Some ( AssignmentEvent :: Remove {
206+ deployment,
207+ node_id : self . node_id . clone ( ) ,
208+ } ) )
209+ }
210+ }
211+ }
212+
213+ pub async fn assignment_events (
214+ self : Arc < Self > ,
215+ ) -> impl Stream < Item = Result < AssignmentEvent , Error > > + Send {
154216 self . subscription_manager
155217 . subscribe ( )
156- . map_err ( |( ) | anyhow ! ( "Entity change stream failed" ) )
157- . map ( |event| {
158- let changes: Vec < _ > = event. changes . iter ( ) . cloned ( ) . map ( AssignmentChange :: into_parts) . collect ( ) ;
159- stream:: iter_ok ( changes)
160- } )
218+ . map ( |event| futures03:: stream:: iter ( event. changes . clone ( ) ) )
161219 . flatten ( )
162- . and_then (
163- move |( deployment, operation) | -> Result < Box < dyn Stream < Item = _ , Error = _ > + Send > , _ > {
164- trace ! ( logger, "Received assignment change" ;
165- "deployment" => %deployment,
166- "operation" => format!( "{:?}" , operation) ,
167- ) ;
168-
169- match operation {
170- AssignmentOperation :: Set => {
171- store
172- . assignment_status ( & deployment)
173- . map_err ( |e| {
174- anyhow ! ( "Failed to get subgraph assignment entity: {}" , e)
175- } )
176- . map ( |assigned| -> Box < dyn Stream < Item = _ , Error = _ > + Send > {
177- let logger = logger. new ( o ! ( "subgraph_id" => deployment. hash. to_string( ) , "node_id" => node_id. to_string( ) ) ) ;
178- if let Some ( ( assigned, is_paused) ) = assigned {
179- if assigned == node_id {
180-
181- if is_paused{
182- // Subgraph is paused, so we don't start it
183- debug ! ( logger, "Deployment assignee is this node" ; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore" ) ;
184- return Box :: new ( stream:: empty ( ) ) ;
185- }
186-
187- // Start subgraph on this node
188- debug ! ( logger, "Deployment assignee is this node" ; "assigned_to" => assigned, "action" => "add" ) ;
189- Box :: new ( stream:: once ( Ok ( AssignmentEvent :: Add {
190- deployment,
191- node_id : node_id. clone ( ) ,
192- } ) ) )
193- } else {
194- // Ensure it is removed from this node
195- debug ! ( logger, "Deployment assignee is not this node" ; "assigned_to" => assigned, "action" => "remove" ) ;
196- Box :: new ( stream:: once ( Ok ( AssignmentEvent :: Remove {
197- deployment,
198- node_id : node_id. clone ( ) ,
199- } ) ) )
200- }
201- } else {
202- // Was added/updated, but is now gone.
203- debug ! ( logger, "Deployment assignee not found in database" ; "action" => "ignore" ) ;
204- Box :: new ( stream:: empty ( ) )
205- }
206- } )
207- }
208- AssignmentOperation :: Removed => {
209- // Send remove event without checking node ID.
210- // If node ID does not match, then this is a no-op when handled in
211- // assignment provider.
212- Ok ( Box :: new ( stream:: once ( Ok ( AssignmentEvent :: Remove {
213- deployment,
214- node_id : node_id. clone ( ) ,
215- } ) ) ) )
220+ . then ( {
221+ let this = self . cheap_clone ( ) ;
222+ move |change| {
223+ let this = this. cheap_clone ( ) ;
224+
225+ async move {
226+ match this. map_assignment ( change) {
227+ Ok ( Some ( event) ) => stream:: once ( futures03:: future:: ok ( event) ) . boxed ( ) ,
228+ Ok ( None ) => stream:: empty ( ) . boxed ( ) ,
229+ Err ( e) => stream:: once ( futures03:: future:: err ( e) ) . boxed ( ) ,
216230 }
217231 }
218- } ,
219- )
232+ }
233+ } )
220234 . flatten ( )
221235 }
222236
@@ -235,6 +249,8 @@ where
235249 // the receiver terminates without receiving anything.
236250 let deployments = HashSet :: < DeploymentLocator > :: from_iter ( deployments) ;
237251 let deployments_len = deployments. len ( ) ;
252+ debug ! ( logger, "Starting all assigned subgraphs" ;
253+ "count" => deployments_len, "node_id" => & node_id) ;
238254 let ( sender, receiver) = futures03:: channel:: mpsc:: channel :: < ( ) > ( 1 ) ;
239255 for id in deployments {
240256 let sender = sender. clone ( ) ;
@@ -442,7 +458,7 @@ where
442458async fn handle_assignment_event (
443459 event : AssignmentEvent ,
444460 provider : Arc < impl SubgraphAssignmentProviderTrait > ,
445- logger : Logger ,
461+ logger : & Logger ,
446462) -> Result < ( ) , CancelableError < SubgraphAssignmentProviderError > > {
447463 let logger = logger. clone ( ) ;
448464
0 commit comments