@@ -661,27 +661,29 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
661661 return depGraph , nil
662662}
663663
664+ type interimGraphVertex struct {
665+ dependency client.Dependency
666+ scheduledResource * ScheduledResource
667+ parentContext * graphContext
668+ }
669+
664670func (sched * scheduler ) fillDependencyGraph (rootContext * graphContext ,
665671 resDefs map [string ]client.ResourceDefinition ,
666672 dependencies map [string ][]client.Dependency ,
667673 flow * client.Flow , replicas []client.Replica , useDestructionSelector bool ) error {
668674
669- type Block struct {
670- dependency client.Dependency
671- scheduledResource * ScheduledResource
672- parentContext * graphContext
673- }
674- blocks := map [string ][]* Block {}
675+ var vertices [][]interimGraphVertex
675676 silent := rootContext .graph .Options ().Silent
676677
677678 for _ , replica := range replicas {
679+ var replicaVertices []interimGraphVertex
678680 replicaName := replica .ReplicaName ()
679681 replicaContext := sched .prepareContext (rootContext , nil , replicaName )
680682 queue := list .New ()
681- queue .PushFront (& Block {dependency : client.Dependency {Child : "flow/" + flow .Name }})
683+ queue .PushFront (interimGraphVertex {dependency : client.Dependency {Child : "flow/" + flow .Name }})
682684
683685 for e := queue .Front (); e != nil ; e = e .Next () {
684- parent := e .Value .(* Block )
686+ parent := e .Value .(interimGraphVertex )
685687
686688 deps := filterDependencies (dependencies , parent .dependency .Child , flow , useDestructionSelector )
687689
@@ -707,44 +709,163 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
707709 }
708710 sr .usedInReplicas = []string {replicaName }
709711
710- block := & Block {
712+ vertex := interimGraphVertex {
711713 scheduledResource : sr ,
712714 dependency : dep ,
713715 parentContext : parentContext ,
714716 }
715-
716- blocks [dep .Child ] = append (blocks [dep .Child ], block )
717+ replicaVertices = append (replicaVertices , vertex )
717718
718719 if parent .scheduledResource != nil {
719720 sr .Requires = append (sr .Requires , parent .scheduledResource .Key ())
720721 parent .scheduledResource .RequiredBy = append (parent .scheduledResource .RequiredBy , sr .Key ())
721722 sr .Meta [parent .dependency .Child ] = dep .Meta
722723 }
723- queue .PushBack (block )
724+ queue .PushBack (vertex )
724725 }
725726 }
726- for _ , block := range blocks {
727- for _ , entry := range block {
728- key := entry .scheduledResource .Key ()
729- existingSr := rootContext .graph .graph [key ]
730- if existingSr == nil {
731- if ! silent {
732- log .Printf ("Adding resource %s to the dependency graph flow %s" , key , flow .Name )
727+ vertices = append (vertices , replicaVertices )
728+ }
729+
730+ if flow .Sequential {
731+ sched .concatenateReplicas (vertices , rootContext , rootContext .graph .Options ())
732+ } else {
733+ sched .mergeReplicas (vertices , rootContext , rootContext .graph .Options ())
734+ }
735+ return nil
736+ }
737+
738+ func (sched * scheduler ) mergeReplicas (vertices [][]interimGraphVertex , gc * graphContext ,
739+ options interfaces.DependencyGraphOptions ) {
740+
741+ for _ , replicaVertices := range vertices {
742+ sched .mergeInterimGraphVertices (replicaVertices , gc .graph .graph , options )
743+ }
744+ }
745+
746+ func (sched * scheduler ) concatenateReplicas (vertices [][]interimGraphVertex , gc * graphContext ,
747+ options interfaces.DependencyGraphOptions ) {
748+ graph := gc .graph .graph
749+ var previousReplicaGraph map [string ]* ScheduledResource
750+ for i , replicaVertices := range vertices {
751+ replicaGraph := map [string ]* ScheduledResource {}
752+ sched .mergeInterimGraphVertices (replicaVertices , replicaGraph , options )
753+
754+ if i > 0 {
755+ toReplace := map [string ]* ScheduledResource {}
756+ for key , sr := range replicaGraph {
757+ if graph [key ] != nil {
758+ toReplace [key ] = sr
759+ }
760+ }
761+ for key , sr := range toReplace {
762+ sched .toVoid (sr , i , graph )
763+ for _ , rKey := range sr .RequiredBy {
764+ requires := replicaGraph [rKey ].Requires
765+ for i , rKey2 := range requires {
766+ if rKey2 == key {
767+ requires [i ] = sr .Key ()
768+ break
769+ }
733770 }
734- rootContext . graph . graph [ key ] = entry . scheduledResource
735- } else {
736- sched . updateContext ( existingSr . context , entry . parentContext , entry . dependency )
737- existingSr . Requires = append ( existingSr . Requires , entry . scheduledResource . Requires ... )
738- existingSr . RequiredBy = append ( existingSr . RequiredBy , entry . scheduledResource . RequiredBy ... )
739- existingSr . usedInReplicas = append ( existingSr . usedInReplicas , entry . scheduledResource . usedInReplicas ... )
740- for metaKey , metaValue := range entry . scheduledResource . Meta {
741- existingSr . Meta [ metaKey ] = metaValue
771+ }
772+ for _ , rKey := range sr . Requires {
773+ requiredBy := replicaGraph [ rKey ]. RequiredBy
774+ for i , rKey2 := range requiredBy {
775+ if rKey2 == key {
776+ requiredBy [ i ] = sr . Key ( )
777+ break
778+ }
742779 }
743780 }
781+ delete (replicaGraph , key )
782+ replicaGraph [sr .Key ()] = sr
783+ }
784+
785+ syncPointName := fmt .Sprintf ("splice-%d" , i )
786+ syncPoint , _ := sched .newScheduledResource (
787+ "void" , syncPointName , "" , nil , gc , true )
788+ syncPointName = "void/" + syncPointName
789+ graph [syncPointName ] = syncPoint
790+
791+ for _ , leafName := range getLeafs (previousReplicaGraph ) {
792+ leaf := previousReplicaGraph [leafName ]
793+ syncPoint .Requires = append (syncPoint .Requires , leafName )
794+ leaf .RequiredBy = append (leaf .RequiredBy , syncPointName )
795+ }
796+ for _ , rootName := range getRoots (replicaGraph ) {
797+ root := replicaGraph [rootName ]
798+ syncPoint .RequiredBy = append (syncPoint .RequiredBy , rootName )
799+ root .Requires = append (root .Requires , syncPointName )
800+ }
801+ }
802+ previousReplicaGraph = replicaGraph
803+
804+ for key , value := range replicaGraph {
805+ graph [key ] = value
806+ }
807+
808+ }
809+ }
810+
811+ func getRoots (graph map [string ]* ScheduledResource ) []string {
812+ var result []string
813+ for key , sr := range graph {
814+ if len (sr .Requires ) == 0 {
815+ result = append (result , key )
816+ }
817+ }
818+ return result
819+ }
820+
821+ func getLeafs (graph map [string ]* ScheduledResource ) []string {
822+ var result []string
823+ for key , sr := range graph {
824+ if len (sr .RequiredBy ) == 0 {
825+ result = append (result , key )
826+ }
827+ }
828+ return result
829+ }
830+
831+ func (sched * scheduler ) toVoid (sr * ScheduledResource , index int , graph map [string ]* ScheduledResource ) {
832+ kind , name , _ , _ := keyParts (sr .Key ())
833+ var newName string
834+ for i := 1 ; ; i ++ {
835+ newName = fmt .Sprintf ("%s-%s-%d" , kind , name , index + 1 )
836+ if i > 1 {
837+ newName = newName + fmt .Sprintf ("_%d" , i )
838+ }
839+ if graph ["void/" + newName ] == nil {
840+ break
841+ }
842+ }
843+ resourceTemplate , _ := resources .KindToResourceTemplate ["void" ]
844+ res , _ , _ := sched .newResource ("void" , newName , nil , sr .context , resourceTemplate , true )
845+ sr .Resource = res
846+ }
847+
848+ func (sched * scheduler ) mergeInterimGraphVertices (vertices []interimGraphVertex , graph map [string ]* ScheduledResource ,
849+ options interfaces.DependencyGraphOptions ) {
850+
851+ for _ , entry := range vertices {
852+ key := entry .scheduledResource .Key ()
853+ existingSr := graph [key ]
854+ if existingSr == nil {
855+ if ! options .Silent {
856+ log .Printf ("Adding resource %s to the dependency graph flow %s" , key , options .FlowName )
857+ }
858+ graph [key ] = entry .scheduledResource
859+ } else {
860+ sched .updateContext (existingSr .context , entry .parentContext , entry .dependency )
861+ existingSr .Requires = append (existingSr .Requires , entry .scheduledResource .Requires ... )
862+ existingSr .RequiredBy = append (existingSr .RequiredBy , entry .scheduledResource .RequiredBy ... )
863+ existingSr .usedInReplicas = append (existingSr .usedInReplicas , entry .scheduledResource .usedInReplicas ... )
864+ for metaKey , metaValue := range entry .scheduledResource .Meta {
865+ existingSr .Meta [metaKey ] = metaValue
744866 }
745867 }
746868 }
747- return nil
748869}
749870
750871// getResourceDestructors builds a list of functions, each of them delete one of replica resources
0 commit comments