Skip to content

Commit 67031e7

Browse files
author
Stan Lagun
committed
Sequential flow replication
For sequential flow, each next replica is attached to the leafs of previous one so that they will be deployed sequentially
1 parent d1a58cb commit 67031e7

4 files changed

Lines changed: 207 additions & 28 deletions

File tree

examples/etcd/resdefs/scale-flow.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ metadata:
44
name: etcd-scale
55

66
exported: true
7+
sequential: true
8+
79
construction:
810
flow: etcd-scale
911
destruction:

pkg/client/flows.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ type Flow struct {
4646
// can only be triggered by other flows (including DEFAULT flow which is exported by-default)
4747
Exported bool `json:"exported,omitempty"`
4848

49+
// Flow replicas must be deployed sequentially, one by one
50+
Sequential bool `json:"sequential,omitempty"`
51+
4952
// Parameters that the flow can accept (i.e. valid inputs for the flow)
5053
Parameters map[string]FlowParameter `json:"parameters,omitempty"`
5154

pkg/scheduler/dependency_graph.go

Lines changed: 149 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -661,27 +661,29 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
661661
return depGraph, nil
662662
}
663663

664+
type interimGraphVertex struct {
665+
dependency client.Dependency
666+
scheduledResource *ScheduledResource
667+
parentContext *graphContext
668+
}
669+
664670
func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
665671
resDefs map[string]client.ResourceDefinition,
666672
dependencies map[string][]client.Dependency,
667673
flow *client.Flow, replicas []client.Replica, useDestructionSelector bool) error {
668674

669-
type Block struct {
670-
dependency client.Dependency
671-
scheduledResource *ScheduledResource
672-
parentContext *graphContext
673-
}
674-
blocks := map[string][]*Block{}
675+
var vertices [][]interimGraphVertex
675676
silent := rootContext.graph.Options().Silent
676677

677678
for _, replica := range replicas {
679+
var replicaVertices []interimGraphVertex
678680
replicaName := replica.ReplicaName()
679681
replicaContext := sched.prepareContext(rootContext, nil, replicaName)
680682
queue := list.New()
681-
queue.PushFront(&Block{dependency: client.Dependency{Child: "flow/" + flow.Name}})
683+
queue.PushFront(interimGraphVertex{dependency: client.Dependency{Child: "flow/" + flow.Name}})
682684

683685
for e := queue.Front(); e != nil; e = e.Next() {
684-
parent := e.Value.(*Block)
686+
parent := e.Value.(interimGraphVertex)
685687

686688
deps := filterDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector)
687689

@@ -707,44 +709,163 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
707709
}
708710
sr.usedInReplicas = []string{replicaName}
709711

710-
block := &Block{
712+
vertex := interimGraphVertex{
711713
scheduledResource: sr,
712714
dependency: dep,
713715
parentContext: parentContext,
714716
}
715-
716-
blocks[dep.Child] = append(blocks[dep.Child], block)
717+
replicaVertices = append(replicaVertices, vertex)
717718

718719
if parent.scheduledResource != nil {
719720
sr.Requires = append(sr.Requires, parent.scheduledResource.Key())
720721
parent.scheduledResource.RequiredBy = append(parent.scheduledResource.RequiredBy, sr.Key())
721722
sr.Meta[parent.dependency.Child] = dep.Meta
722723
}
723-
queue.PushBack(block)
724+
queue.PushBack(vertex)
724725
}
725726
}
726-
for _, block := range blocks {
727-
for _, entry := range block {
728-
key := entry.scheduledResource.Key()
729-
existingSr := rootContext.graph.graph[key]
730-
if existingSr == nil {
731-
if !silent {
732-
log.Printf("Adding resource %s to the dependency graph flow %s", key, flow.Name)
727+
vertices = append(vertices, replicaVertices)
728+
}
729+
730+
if flow.Sequential {
731+
sched.concatenateReplicas(vertices, rootContext, rootContext.graph.Options())
732+
} else {
733+
sched.mergeReplicas(vertices, rootContext, rootContext.graph.Options())
734+
}
735+
return nil
736+
}
737+
738+
func (sched *scheduler) mergeReplicas(vertices [][]interimGraphVertex, gc *graphContext,
739+
options interfaces.DependencyGraphOptions) {
740+
741+
for _, replicaVertices := range vertices {
742+
sched.mergeInterimGraphVertices(replicaVertices, gc.graph.graph, options)
743+
}
744+
}
745+
746+
func (sched *scheduler) concatenateReplicas(vertices [][]interimGraphVertex, gc *graphContext,
747+
options interfaces.DependencyGraphOptions) {
748+
graph := gc.graph.graph
749+
var previousReplicaGraph map[string]*ScheduledResource
750+
for i, replicaVertices := range vertices {
751+
replicaGraph := map[string]*ScheduledResource{}
752+
sched.mergeInterimGraphVertices(replicaVertices, replicaGraph, options)
753+
754+
if i > 0 {
755+
toReplace := map[string]*ScheduledResource{}
756+
for key, sr := range replicaGraph {
757+
if graph[key] != nil {
758+
toReplace[key] = sr
759+
}
760+
}
761+
for key, sr := range toReplace {
762+
sched.toVoid(sr, i, graph)
763+
for _, rKey := range sr.RequiredBy {
764+
requires := replicaGraph[rKey].Requires
765+
for i, rKey2 := range requires {
766+
if rKey2 == key {
767+
requires[i] = sr.Key()
768+
break
769+
}
733770
}
734-
rootContext.graph.graph[key] = entry.scheduledResource
735-
} else {
736-
sched.updateContext(existingSr.context, entry.parentContext, entry.dependency)
737-
existingSr.Requires = append(existingSr.Requires, entry.scheduledResource.Requires...)
738-
existingSr.RequiredBy = append(existingSr.RequiredBy, entry.scheduledResource.RequiredBy...)
739-
existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...)
740-
for metaKey, metaValue := range entry.scheduledResource.Meta {
741-
existingSr.Meta[metaKey] = metaValue
771+
}
772+
for _, rKey := range sr.Requires {
773+
requiredBy := replicaGraph[rKey].RequiredBy
774+
for i, rKey2 := range requiredBy {
775+
if rKey2 == key {
776+
requiredBy[i] = sr.Key()
777+
break
778+
}
742779
}
743780
}
781+
delete(replicaGraph, key)
782+
replicaGraph[sr.Key()] = sr
783+
}
784+
785+
syncPointName := fmt.Sprintf("splice-%d", i)
786+
syncPoint, _ := sched.newScheduledResource(
787+
"void", syncPointName, "", nil, gc, true)
788+
syncPointName = "void/" + syncPointName
789+
graph[syncPointName] = syncPoint
790+
791+
for _, leafName := range getLeafs(previousReplicaGraph) {
792+
leaf := previousReplicaGraph[leafName]
793+
syncPoint.Requires = append(syncPoint.Requires, leafName)
794+
leaf.RequiredBy = append(leaf.RequiredBy, syncPointName)
795+
}
796+
for _, rootName := range getRoots(replicaGraph) {
797+
root := replicaGraph[rootName]
798+
syncPoint.RequiredBy = append(syncPoint.RequiredBy, rootName)
799+
root.Requires = append(root.Requires, syncPointName)
800+
}
801+
}
802+
previousReplicaGraph = replicaGraph
803+
804+
for key, value := range replicaGraph {
805+
graph[key] = value
806+
}
807+
808+
}
809+
}
810+
811+
func getRoots(graph map[string]*ScheduledResource) []string {
812+
var result []string
813+
for key, sr := range graph {
814+
if len(sr.Requires) == 0 {
815+
result = append(result, key)
816+
}
817+
}
818+
return result
819+
}
820+
821+
func getLeafs(graph map[string]*ScheduledResource) []string {
822+
var result []string
823+
for key, sr := range graph {
824+
if len(sr.RequiredBy) == 0 {
825+
result = append(result, key)
826+
}
827+
}
828+
return result
829+
}
830+
831+
func (sched *scheduler) toVoid(sr *ScheduledResource, index int, graph map[string]*ScheduledResource) {
832+
kind, name, _, _ := keyParts(sr.Key())
833+
var newName string
834+
for i := 1; ; i++ {
835+
newName = fmt.Sprintf("%s-%s-%d", kind, name, index+1)
836+
if i > 1 {
837+
newName = newName + fmt.Sprintf("_%d", i)
838+
}
839+
if graph["void/"+newName] == nil {
840+
break
841+
}
842+
}
843+
resourceTemplate, _ := resources.KindToResourceTemplate["void"]
844+
res, _, _ := sched.newResource("void", newName, nil, sr.context, resourceTemplate, true)
845+
sr.Resource = res
846+
}
847+
848+
func (sched *scheduler) mergeInterimGraphVertices(vertices []interimGraphVertex, graph map[string]*ScheduledResource,
849+
options interfaces.DependencyGraphOptions) {
850+
851+
for _, entry := range vertices {
852+
key := entry.scheduledResource.Key()
853+
existingSr := graph[key]
854+
if existingSr == nil {
855+
if !options.Silent {
856+
log.Printf("Adding resource %s to the dependency graph flow %s", key, options.FlowName)
857+
}
858+
graph[key] = entry.scheduledResource
859+
} else {
860+
sched.updateContext(existingSr.context, entry.parentContext, entry.dependency)
861+
existingSr.Requires = append(existingSr.Requires, entry.scheduledResource.Requires...)
862+
existingSr.RequiredBy = append(existingSr.RequiredBy, entry.scheduledResource.RequiredBy...)
863+
existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...)
864+
for metaKey, metaValue := range entry.scheduledResource.Meta {
865+
existingSr.Meta[metaKey] = metaValue
744866
}
745867
}
746868
}
747-
return nil
748869
}
749870

750871
// getResourceDestructors builds a list of functions, each of them delete one of replica resources

pkg/scheduler/flows_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,3 +1367,56 @@ func TestSyncOnVoidResource(t *testing.T) {
13671367
depGraph.Deploy(stopChan)
13681368
ensureReplicas(c, t, replicaCount, replicaCount)
13691369
}
1370+
1371+
// TestSequentialReplication tests that resources of sequentially replicated flows create in right order
1372+
func TestSequentialReplication(t *testing.T) {
1373+
replicaCount := 3
1374+
flow := mocks.MakeFlow("test")
1375+
flow.Flow.Sequential = true
1376+
1377+
c, fake := mocks.NewClientWithFake(
1378+
flow,
1379+
mocks.MakeResourceDefinition("pod/ready-$AC_NAME"),
1380+
mocks.MakeResourceDefinition("secret/secret"),
1381+
mocks.MakeResourceDefinition("job/ready-$AC_NAME"),
1382+
mocks.MakeDependency("flow/test", "pod/ready-$AC_NAME", "flow=test"),
1383+
mocks.MakeDependency("pod/ready-$AC_NAME", "secret/secret", "flow=test"),
1384+
mocks.MakeDependency("secret/secret", "job/ready-$AC_NAME", "flow=test"),
1385+
)
1386+
1387+
stopChan := make(chan struct{})
1388+
var deployed []string
1389+
fake.PrependReactor("create", "*",
1390+
func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
1391+
resource := action.GetResource().Resource
1392+
if resource != "replica" {
1393+
deployed = append(deployed, resource)
1394+
}
1395+
1396+
return false, nil, nil
1397+
})
1398+
1399+
depGraph, err := New(c, nil, 0).BuildDependencyGraph(
1400+
interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "test"})
1401+
if err != nil {
1402+
t.Fatal(err)
1403+
}
1404+
1405+
graph := depGraph.(*dependencyGraph).graph
1406+
if len(graph) != 3*replicaCount+replicaCount-1 {
1407+
t.Error("wrong dependency graph length")
1408+
}
1409+
1410+
depGraph.Deploy(stopChan)
1411+
expected := []string{"pods", "secrets", "jobs", "pods", "jobs", "pods", "jobs"}
1412+
if len(deployed) != len(expected) {
1413+
t.Fatal("invalid resource sequence", deployed)
1414+
}
1415+
for i, r := range deployed {
1416+
if expected[i] != r {
1417+
t.Fatal("invalid resource sequence")
1418+
}
1419+
}
1420+
1421+
ensureReplicas(c, t, replicaCount, replicaCount)
1422+
}

0 commit comments

Comments
 (0)