From 9ed8be686f3440636a125ad5d77bb498c05198fe Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Sat, 20 May 2017 18:22:22 -0700 Subject: [PATCH 1/3] Dependency replication This change adds ability to replicate dependency with index parameters iterated over arbitrary number of lists. For each dependency it is now possible to specify map of indexVariableName -> listExpression listExpression := range|item + [, listExpression] range := number '..' number item := STRING for example, if for "i: 1..3" the dependency will be replicated into 3 clones, each one of them having argument i set to value in range [1, 3] This also allows to consume N flow replicas by replicating the dependency that leads to the consumed flow --- docs/flows.md | 92 +++++++++++++++- pkg/client/dependencies.go | 3 + pkg/interfaces/interfaces.go | 1 - pkg/resources/flow.go | 12 +-- pkg/scheduler/dependency_graph.go | 144 +++++++++++++++++++++---- pkg/scheduler/dependency_graph_test.go | 74 +++++++++++++ pkg/scheduler/flows_test.go | 99 +++++++++++++++++ 7 files changed, 394 insertions(+), 31 deletions(-) diff --git a/docs/flows.md b/docs/flows.md index 9d42583..36d82d1 100644 --- a/docs/flows.md +++ b/docs/flows.md @@ -195,7 +195,7 @@ my-flow -> -> pod/pod-$arg ``` will create two pods: `pod-a` and `pod-b`. -## Replication of flows +## Replication Flow replication is an AppController feature that makes specified number of flow graph copies, each one with a unique name and then merges them into a single graph. Because each replica name may be used in some of resource @@ -234,6 +234,96 @@ If there were 7 of them, 4 replicas would be deleted.\ `kubeac run my-flow` if there are no replicas exist, create one, otherwise validate status of resources of existing replicas. +### Replication of dependencies + +With commandline parameters one can create number of flow replicas. But sometimes there is a need to have flow +that creates several replicas of another flow, or just several resources with the same specification that differ +only in name. + +One possible solution is to utilize technique shown above: make parameter value be part of resource name and +then duplicate the dependency that leads to this resource and pass different parameter value along each of +dependencies. This works well for small and fixed number of replicas. But if the number goes big, it becomes hard +to manage such number of dependency objects. Moreover if the number itself is not fixed but rather passed as a +parameter replicating resource by manual replication of dependencies becomes impossible. + +Luckily, the dependencies can be automatically replicated. This is done through the `generateFor` field of the +`Dependency` object. `generateFor` is a map where keys are argument names and values are list expressions. Each list +expression is comma-separated list of values. If the value has a form of `number..number`, it is expended into a +list of integers in the given range. For example `"1..3, 10..11, abc"` will turn into `["1", "2", "3", "10", "11", "abc"]`. +Then the dependency is going to be replicated automatically with each replica getting on of the list values as an +additional argument. There can be several `generateFor` arguments. In this case there is going to be one dependency +for each combination of the list values. For example, + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency +parent: pod/podName +child: flow/flowName-$x-$y +generateFor: + x: 1..2 + y: a, b +``` + +has the same effect as + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency1 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 1 + y: a +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency2 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 2 + y: a +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency3 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 1 + y: b +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency4 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 2 + y: b +``` + +Besides simplifying the dependency graph, dependency replication makes possible to have dynamic number of replicas +by using parameter value right inside the list expressions: + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency +parent: pod/podName +child: flow/flowName-$index +generateFor: + index: 1..$replicaCount +``` + ### Replica-spaces and contexts Replica-space, is a tag that all replicas of the flow share. When new `Replica` object for the flow is created, diff --git a/pkg/client/dependencies.go b/pkg/client/dependencies.go index 075b48d..5b72158 100644 --- a/pkg/client/dependencies.go +++ b/pkg/client/dependencies.go @@ -41,6 +41,9 @@ type Dependency struct { // Arguments passed to dependent resource Args map[string]string `json:"args,omitempty"` + + // map of variable name -> list expression. New dependencies are generated by replication and iteration over those lists + GenerateFor map[string]string `json:"generateFor,omitempty"` } // DependencyList is a k8s object representing list of dependencies diff --git a/pkg/interfaces/interfaces.go b/pkg/interfaces/interfaces.go index 384c010..929d627 100644 --- a/pkg/interfaces/interfaces.go +++ b/pkg/interfaces/interfaces.go @@ -82,7 +82,6 @@ type GraphContext interface { Scheduler() Scheduler GetArg(string) string Graph() DependencyGraph - Dependency() *client.Dependency } // DependencyGraphOptions contains all the input required to build a dependency graph diff --git a/pkg/resources/flow.go b/pkg/resources/flow.go index 564801f..4f7c0ce 100644 --- a/pkg/resources/flow.go +++ b/pkg/resources/flow.go @@ -15,9 +15,7 @@ package resources import ( - "fmt" "log" - "strings" "github.com/Mirantis/k8s-AppController/pkg/client" "github.com/Mirantis/k8s-AppController/pkg/interfaces" @@ -29,7 +27,6 @@ type flow struct { flow *client.Flow context interfaces.GraphContext originalName string - instanceName string currentGraph interfaces.DependencyGraph } @@ -52,19 +49,12 @@ func (flowTemplateFactory) Kind() string { func (flowTemplateFactory) New(def client.ResourceDefinition, c client.Interface, gc interfaces.GraphContext) interfaces.Resource { newFlow := parametrizeResource(def.Flow, gc, []string{"*"}).(*client.Flow) - dep := gc.Dependency() - var depName string - if dep != nil { - depName = strings.Replace(dep.Name, dep.GenerateName, "", 1) - } - return report.SimpleReporter{ BaseResource: &flow{ Base: Base{def.Meta}, flow: newFlow, context: gc, originalName: def.Flow.Name, - instanceName: fmt.Sprintf("%s%s", depName, gc.GetArg("AC_NAME")), }} } @@ -98,7 +88,7 @@ func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.D options := interfaces.DependencyGraphOptions{ FlowName: f.originalName, Args: args, - FlowInstanceName: f.instanceName, + FlowInstanceName: f.context.GetArg("AC_ID"), ReplicaCount: replicaCount, Silent: silent, FixedNumberOfReplicas: fixedNumberOfReplicas, diff --git a/pkg/scheduler/dependency_graph.go b/pkg/scheduler/dependency_graph.go index 8dab597..737840d 100644 --- a/pkg/scheduler/dependency_graph.go +++ b/pkg/scheduler/dependency_graph.go @@ -19,6 +19,7 @@ import ( "fmt" "log" "sort" + "strconv" "strings" "time" @@ -40,12 +41,12 @@ type dependencyGraph struct { } type graphContext struct { - args map[string]string - graph *dependencyGraph - scheduler *scheduler - flow *client.Flow - dependency *client.Dependency - replica string + args map[string]string + graph *dependencyGraph + scheduler *scheduler + flow *client.Flow + id string + replica string } var _ interfaces.GraphContext = &graphContext{} @@ -62,6 +63,8 @@ func (gc graphContext) GetArg(name string) string { return gc.replica case "AC_FLOW_NAME": return gc.flow.Name + case "AC_ID": + return gc.id default: val, ok := gc.args[name] if ok { @@ -84,11 +87,6 @@ func (gc graphContext) Graph() interfaces.DependencyGraph { return gc.graph } -// Dependency returns Dependency for which child is the resource being created with this context -func (gc graphContext) Dependency() *client.Dependency { - return gc.dependency -} - // newScheduledResourceFor returns new scheduled resource for given resource in init state func newScheduledResourceFor(r interfaces.Resource, suffix string, context *graphContext, existing bool) *ScheduledResource { return &ScheduledResource{ @@ -159,7 +157,9 @@ func groupDependencies(dependencies []client.Dependency, defaultFlow = []client.Dependency{} addResource := func(name string) { if !strings.HasPrefix(name, "flow/") && !isDependant[name] { - defaultFlow = append(defaultFlow, client.Dependency{Parent: defaultFlowName, Child: name}) + dep := client.Dependency{Parent: defaultFlowName, Child: name} + dep.Name = name + defaultFlow = append(defaultFlow, dep) isDependant[name] = true } } @@ -328,11 +328,11 @@ func getArgFunc(gc interfaces.GraphContext) func(string) string { func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *client.Dependency, replica string) *graphContext { context := &graphContext{ - scheduler: sched, - graph: parentContext.graph, - flow: parentContext.flow, - replica: replica, - dependency: dependency, + scheduler: sched, + graph: parentContext.graph, + flow: parentContext.flow, + replica: replica, + id: getVertexID(dependency, replica), } context.args = make(map[string]string) @@ -344,6 +344,15 @@ func (sched *scheduler) prepareContext(parentContext *graphContext, dependency * return context } +func getVertexID(dependency *client.Dependency, replica string) string { + var depName string + if dependency != nil { + depName = strings.Replace(dependency.Name, dependency.GenerateName, "", 1) + } + depName += replica + return depName +} + func (sched *scheduler) updateContext(context, parentContext *graphContext, dependency client.Dependency) { for key, value := range dependency.Args { context.args[key] = copier.EvaluateString(value, parentContext.GetArg) @@ -661,6 +670,105 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO return depGraph, nil } +func listDependencies(dependencies map[string][]client.Dependency, parent string, flow *client.Flow, + useDestructionSelector bool, context *graphContext) []client.Dependency { + + deps := filterDependencies(dependencies, parent, flow, useDestructionSelector) + var result []client.Dependency + for _, dep := range deps { + if len(dep.GenerateFor) == 0 { + result = append(result, dep) + continue + } + + var keys []string + for k := range dep.GenerateFor { + keys = append(keys, k) + } + sort.Strings(keys) + lists := make([][]string, len(dep.GenerateFor)) + for i, key := range keys { + lists[i] = expandListExpression(copier.EvaluateString(dep.GenerateFor[key], getArgFunc(context))) + } + for n, combination := range permute(lists) { + newArgs := make(map[string]string, len(dep.Args)+len(keys)) + for k, v := range dep.Args { + newArgs[k] = v + } + for i, key := range keys { + newArgs[key] = combination[i] + } + depCopy := dep + depCopy.Args = newArgs + depCopy.Name += strconv.Itoa(n + 1) + result = append(result, depCopy) + } + } + return result +} + +func permute(variants [][]string) [][]string { + switch len(variants) { + case 0: + return variants + case 1: + var result [][]string + for _, v := range variants[0] { + result = append(result, []string{v}) + } + return result + default: + var result [][]string + for _, tail := range variants[len(variants)-1] { + for _, p := range permute(variants[:len(variants)-1]) { + result = append(result, append(p, tail)) + } + } + return result + } +} + +func expandListExpression(expr string) []string { + var result []string + for _, part := range strings.Split(expr, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + + isRange := true + var from, to int + + rangeParts := strings.SplitN(part, "..", 2) + if len(rangeParts) != 2 { + isRange = false + } + + var err error + if isRange { + from, err = strconv.Atoi(rangeParts[0]) + if err != nil { + isRange = false + } + } + if isRange { + to, err = strconv.Atoi(rangeParts[1]) + if err != nil { + isRange = false + } + } + + if isRange { + for i := from; i <= to; i++ { + result = append(result, strconv.Itoa(i)) + } + } else { + result = append(result, part) + } + } + return result +} + func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, resDefs map[string]client.ResourceDefinition, dependencies map[string][]client.Dependency, @@ -683,7 +791,7 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, for e := queue.Front(); e != nil; e = e.Next() { parent := e.Value.(*Block) - deps := filterDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector) + deps := listDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector, replicaContext) for _, dep := range deps { if parent.scheduledResource != nil && strings.HasPrefix(parent.scheduledResource.Key(), "flow/") { diff --git a/pkg/scheduler/dependency_graph_test.go b/pkg/scheduler/dependency_graph_test.go index 9cc7068..07ae319 100644 --- a/pkg/scheduler/dependency_graph_test.go +++ b/pkg/scheduler/dependency_graph_test.go @@ -15,6 +15,7 @@ package scheduler import ( + "strings" "testing" "github.com/Mirantis/k8s-AppController/pkg/client" @@ -319,3 +320,76 @@ func TestDependencyToFlowMatching(t *testing.T) { } } } + +// TestPermute tests permute function +func TestPermute(t *testing.T) { + alphabets := [][]string{ + {"1", "2", "3"}, + {"+", "-"}, + {"a", "b"}, + {"="}, + } + + expected := map[string]bool{ + "1+a=": true, + "1+b=": true, + "1-a=": true, + "1-b=": true, + "2+a=": true, + "2+b=": true, + "2-a=": true, + "2-b=": true, + "3+a=": true, + "3+b=": true, + "3-a=": true, + "3-b=": true, + } + permutations := permute(alphabets) + for _, combination := range permutations { + combinationStr := strings.Join(combination, "") + if !expected[combinationStr] { + t.Errorf("unexpected combination %s", combinationStr) + } else { + delete(expected, combinationStr) + } + } + if len(expected) != 0 { + t.Error("not all combinations were generated") + } + + alphabets = append(alphabets, make([]string, 0)) + if len(permute(alphabets)) != 0 { + t.Error("empty alphabet didin't result in empty permutation list") + } +} + +// TestExpendListExpression tests list expression translation to list of strings +func TestExpendListExpression(t *testing.T) { + table := map[string][]string{ + "1": {"1"}, + "1..5": {"1", "2", "3", "4", "5"}, + "2..-1": {}, + "a, b": {"a", "b"}, + "a, b, 2..4": {"a", "b", "2", "3", "4"}, + "-1..1, 2..4, x": {"-1", "0", "1", "2", "3", "4", "x"}, + "a..b": {"a..b"}, + "..": {".."}, + "1...3": {"1...3"}, + "1..b": {"1..b"}, + "a..b, 1..3": {"a..b", "1", "2", "3"}, + "a..b, c..d": {"a..b", "c..d"}, + "": {}, + } + for expr, expected := range table { + result := expandListExpression(expr) + if len(result) != len(expected) { + t.Errorf("unexpected result length for expression %s: %d != %d", expr, len(result), len(expected)) + } else { + for i := range expected { + if expected[i] != result[i] { + t.Errorf("invalid entry %d for expression %s: %s != %s", i, expr, expected[i], result[i]) + } + } + } + } +} diff --git a/pkg/scheduler/flows_test.go b/pkg/scheduler/flows_test.go index 464865c..700e7e0 100644 --- a/pkg/scheduler/flows_test.go +++ b/pkg/scheduler/flows_test.go @@ -1367,3 +1367,102 @@ func TestSyncOnVoidResource(t *testing.T) { depGraph.Deploy(stopChan) ensureReplicas(c, t, replicaCount, replicaCount) } + +// TestConsumeReplicatedFlow tests case, where each replica of the outer flow consumes N replicas of another flow +// by replicating dependency which leads to the consumed flow +func TestConsumeReplicatedFlow(t *testing.T) { + dep := mocks.MakeDependency("flow/outer", "flow/inner/$AC_NAME-$i", "flow=outer") + dep.GenerateFor = map[string]string{"i": "1..3"} + + c := mocks.NewClient( + mocks.MakeFlow("inner"), + mocks.MakeFlow("outer"), + mocks.MakeResourceDefinition("job/ready-$AC_NAME"), + dep, + mocks.MakeDependency("flow/inner", "job/ready-$AC_NAME", "flow=inner"), + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 2, FlowName: "outer"}) + if err != nil { + t.Fatal(err) + } + stopChan := make(chan struct{}) + depGraph.Deploy(stopChan) + + ensureReplicas(c, t, 2*3, 3*2+2) +} + +// TestComplexDependencyReplication tests complex dependency generation over two list expressions +func TestComplexDependencyReplication(t *testing.T) { + dep := mocks.MakeDependency("flow/test", "job/ready-$x-$y", "flow=test") + dep.GenerateFor = map[string]string{ + "x": "1..3, 8..9", + "y": "a, b", + } + + c := mocks.NewClient( + mocks.MakeFlow("test"), + mocks.MakeResourceDefinition("job/ready-$x-$y"), + dep, + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 1, FlowName: "test"}) + if err != nil { + t.Fatal(err) + } + stopChan := make(chan struct{}) + depGraph.Deploy(stopChan) + + expectedJobNames := map[string]bool{ + "ready-1-a": true, + "ready-2-a": true, + "ready-3-a": true, + "ready-8-a": true, + "ready-9-a": true, + "ready-1-b": true, + "ready-2-b": true, + "ready-3-b": true, + "ready-8-b": true, + "ready-9-b": true, + } + jobs := ensureReplicas(c, t, len(expectedJobNames), 1) + for _, j := range jobs { + if !expectedJobNames[j.Name] { + t.Errorf("unexpected job %s", j.Name) + } else { + delete(expectedJobNames, j.Name) + } + } + if len(expectedJobNames) != 0 { + t.Error("not all jobs were found") + } +} + +// TestDynamicDependencyReplication tests that variables can be used in list expressions used for dependency replication +func TestDynamicDependencyReplication(t *testing.T) { + flow := mocks.MakeFlow("test") + flow.Flow.Parameters = map[string]client.FlowParameter{ + "replicaCount": mocks.MakeFlowParameter("1"), + } + + dep := mocks.MakeDependency("flow/test", "job/ready-$index", "flow=test") + dep.GenerateFor = map[string]string{ + "index": "1..$replicaCount", + } + + c := mocks.NewClient( + flow, + mocks.MakeResourceDefinition("job/ready-$index"), + dep, + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 1, FlowName: "test", + Args: map[string]string{"replicaCount": "7"}}) + if err != nil { + t.Fatal(err) + } + stopChan := make(chan struct{}) + depGraph.Deploy(stopChan) + + ensureReplicas(c, t, 7, 1) +} From 4a8d7b0e84bf148d6e32b3032c76177a7ae8cb9e Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Wed, 24 May 2017 22:27:27 -0700 Subject: [PATCH 2/3] Sequential flow replication For sequential flow, each next replica is attached to the leafs of previous one so that they will be deployed sequentially --- docs/flows.md | 9 ++ examples/etcd/README.md | 3 +- examples/etcd/resdefs/scale-flow.yaml | 2 + pkg/client/flows.go | 3 + pkg/resources/flow.go | 13 +- pkg/scheduler/dependency_graph.go | 167 +++++++++++++++++++++----- pkg/scheduler/flows_test.go | 82 +++++++++++++ 7 files changed, 237 insertions(+), 42 deletions(-) diff --git a/docs/flows.md b/docs/flows.md index 36d82d1..eb53890 100644 --- a/docs/flows.md +++ b/docs/flows.md @@ -63,6 +63,7 @@ destruction: replicaSpace: optional-name exported: true +sequential: true parameters: parameterName1: @@ -351,6 +352,14 @@ another flow will "see" only its own replicas so the `Flow` resource can always However, when the flow is run independently, it will not have any context and thus query replicas based on replica-space alone, which means it will get all the replicas from all contexts. +### Sequential flows + +By default, if flow has more than one replica, generated dependency graph would have each replica subgraph attached +to the graph root vertex (the `Flow` vertex). When deployed, resources of all replicas are going to be created in +parallel. However, in some cases it is desired that replicas be deployed sequentially, one by one. This can be achieved +by setting `sequential` attribute of the `Flow` to `true`. For sequential flows each replica roots get attached to the +leaf vertices of previous one. + ## Scheduling flow deployments When user runs `kubeac run something` the deployment does not happen immediately (unless there is also a `--deploy` diff --git a/examples/etcd/README.md b/examples/etcd/README.md index cd945de..22fe40c 100644 --- a/examples/etcd/README.md +++ b/examples/etcd/README.md @@ -26,8 +26,7 @@ If omitted, `etcd` name is used by default. `kubectl exec k8s-appcontroller kubeac run etcd-scale -n +1 --arg clusterName=my-cluster` `-n +1` - adds one node to the cluster. Use `-n -1` to scale the cluster down by one node. In this case the last -added node is going to be deleted. At the moment it is only possible to scale cluster up by one node at a time. -However, any number of nodes can be removed. Note, that this can also remove nodes created upon initial deployment. +added node is going to be deleted. This flow can also remove nodes created upon initial deployment. `--arg clusterName=my-cluster` - name of the cluster to scale (`etcd` if not specified). diff --git a/examples/etcd/resdefs/scale-flow.yaml b/examples/etcd/resdefs/scale-flow.yaml index 4394b4b..4fee091 100644 --- a/examples/etcd/resdefs/scale-flow.yaml +++ b/examples/etcd/resdefs/scale-flow.yaml @@ -4,6 +4,8 @@ metadata: name: etcd-scale exported: true +sequential: true + construction: flow: etcd-scale destruction: diff --git a/pkg/client/flows.go b/pkg/client/flows.go index 04b4251..55df049 100644 --- a/pkg/client/flows.go +++ b/pkg/client/flows.go @@ -46,6 +46,9 @@ type Flow struct { // can only be triggered by other flows (including DEFAULT flow which is exported by-default) Exported bool `json:"exported,omitempty"` + // Flow replicas must be deployed sequentially, one by one + Sequential bool `json:"sequential,omitempty"` + // Parameters that the flow can accept (i.e. valid inputs for the flow) Parameters map[string]FlowParameter `json:"parameters,omitempty"` diff --git a/pkg/resources/flow.go b/pkg/resources/flow.go index 4f7c0ce..9b11f84 100644 --- a/pkg/resources/flow.go +++ b/pkg/resources/flow.go @@ -78,20 +78,13 @@ func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.D args[arg] = val } } - fixedNumberOfReplicas := false - if replicaCount > 0 { - fixedNumberOfReplicas = f.context.Graph().Options().FixedNumberOfReplicas - } else if replicaCount == 0 { - fixedNumberOfReplicas = true - replicaCount = -1 - } options := interfaces.DependencyGraphOptions{ FlowName: f.originalName, Args: args, FlowInstanceName: f.context.GetArg("AC_ID"), ReplicaCount: replicaCount, Silent: silent, - FixedNumberOfReplicas: fixedNumberOfReplicas, + FixedNumberOfReplicas: true, } graph, err := f.context.Scheduler().BuildDependencyGraph(options) @@ -131,7 +124,7 @@ func (f *flow) Create() error { // Delete is called during dlow destruction which can happen only once while Create ensures that at least one flow // replica exists, and as such can be called any number of times func (f flow) Delete() error { - graph, err := f.buildDependencyGraph(-1, false) + graph, err := f.buildDependencyGraph(0, false) if err != nil { return err } @@ -145,7 +138,7 @@ func (f flow) Status(meta map[string]string) (interfaces.ResourceStatus, error) graph := f.currentGraph if graph == nil { var err error - graph, err = f.buildDependencyGraph(0, true) + graph, err = f.buildDependencyGraph(-1, true) if err != nil { return interfaces.ResourceError, err } diff --git a/pkg/scheduler/dependency_graph.go b/pkg/scheduler/dependency_graph.go index 737840d..30308bd 100644 --- a/pkg/scheduler/dependency_graph.go +++ b/pkg/scheduler/dependency_graph.go @@ -769,27 +769,29 @@ func expandListExpression(expr string) []string { return result } +type interimGraphVertex struct { + dependency client.Dependency + scheduledResource *ScheduledResource + parentContext *graphContext +} + func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, resDefs map[string]client.ResourceDefinition, dependencies map[string][]client.Dependency, flow *client.Flow, replicas []client.Replica, useDestructionSelector bool) error { - type Block struct { - dependency client.Dependency - scheduledResource *ScheduledResource - parentContext *graphContext - } - blocks := map[string][]*Block{} + var vertices [][]interimGraphVertex silent := rootContext.graph.Options().Silent for _, replica := range replicas { + var replicaVertices []interimGraphVertex replicaName := replica.ReplicaName() replicaContext := sched.prepareContext(rootContext, nil, replicaName) queue := list.New() - queue.PushFront(&Block{dependency: client.Dependency{Child: "flow/" + flow.Name}}) + queue.PushFront(interimGraphVertex{dependency: client.Dependency{Child: "flow/" + flow.Name}}) for e := queue.Front(); e != nil; e = e.Next() { - parent := e.Value.(*Block) + parent := e.Value.(interimGraphVertex) deps := listDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector, replicaContext) @@ -815,44 +817,149 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, } sr.usedInReplicas = []string{replicaName} - block := &Block{ + vertex := interimGraphVertex{ scheduledResource: sr, dependency: dep, parentContext: parentContext, } - - blocks[dep.Child] = append(blocks[dep.Child], block) + replicaVertices = append(replicaVertices, vertex) if parent.scheduledResource != nil { sr.Requires = append(sr.Requires, parent.scheduledResource.Key()) parent.scheduledResource.RequiredBy = append(parent.scheduledResource.RequiredBy, sr.Key()) sr.Meta[parent.dependency.Child] = dep.Meta } - queue.PushBack(block) + queue.PushBack(vertex) } } - for _, block := range blocks { - for _, entry := range block { - key := entry.scheduledResource.Key() - existingSr := rootContext.graph.graph[key] - if existingSr == nil { - if !silent { - log.Printf("Adding resource %s to the dependency graph flow %s", key, flow.Name) - } - rootContext.graph.graph[key] = entry.scheduledResource - } else { - sched.updateContext(existingSr.context, entry.parentContext, entry.dependency) - existingSr.Requires = append(existingSr.Requires, entry.scheduledResource.Requires...) - existingSr.RequiredBy = append(existingSr.RequiredBy, entry.scheduledResource.RequiredBy...) - existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...) - for metaKey, metaValue := range entry.scheduledResource.Meta { - existingSr.Meta[metaKey] = metaValue - } + vertices = append(vertices, replicaVertices) + } + + if flow.Sequential { + sched.concatenateReplicas(vertices, rootContext, rootContext.graph.Options()) + } else { + sched.mergeReplicas(vertices, rootContext, rootContext.graph.Options()) + } + return nil +} + +func (sched *scheduler) mergeReplicas(vertices [][]interimGraphVertex, gc *graphContext, + options interfaces.DependencyGraphOptions) { + + for _, replicaVertices := range vertices { + sched.mergeInterimGraphVertices(replicaVertices, gc.graph.graph, options) + } +} + +func (sched *scheduler) concatenateReplicas(vertices [][]interimGraphVertex, gc *graphContext, + options interfaces.DependencyGraphOptions) { + graph := gc.graph.graph + var previousReplicaGraph map[string]*ScheduledResource + for i, replicaVertices := range vertices { + replicaGraph := map[string]*ScheduledResource{} + sched.mergeInterimGraphVertices(replicaVertices, replicaGraph, options) + + if i > 0 { + correctDuplicateResources(graph, replicaGraph, i) + + for _, leafName := range getLeafs(previousReplicaGraph) { + for _, rootName := range getRoots(replicaGraph) { + root := replicaGraph[rootName] + leaf := previousReplicaGraph[leafName] + root.Requires = append(root.Requires, leafName) + leaf.RequiredBy = append(leaf.RequiredBy, rootName) } } } + previousReplicaGraph = replicaGraph + for key, value := range replicaGraph { + graph[key] = value + } + } +} + +func correctDuplicateResources(existingGraph, newGraph map[string]*ScheduledResource, index int) { + toReplace := map[string]*ScheduledResource{} + for key, sr := range newGraph { + if existingGraph[key] != nil { + toReplace[key] = sr + } + } + for key, sr := range toReplace { + sr.context.id = existingGraph[key].context.id + j := index + 1 + suffix := sr.suffix + for { + sr.suffix = fmt.Sprintf("%s #%d", suffix, j) + if existingGraph[sr.Key()] == nil { + break + } + j++ + } + for _, rKey := range sr.RequiredBy { + requires := newGraph[rKey].Requires + for i, rKey2 := range requires { + if rKey2 == key { + requires[i] = sr.Key() + break + } + } + } + for _, rKey := range sr.Requires { + requiredBy := newGraph[rKey].RequiredBy + for i, rKey2 := range requiredBy { + if rKey2 == key { + requiredBy[i] = sr.Key() + break + } + } + } + delete(newGraph, key) + newGraph[sr.Key()] = sr + } +} + +func getRoots(graph map[string]*ScheduledResource) []string { + var result []string + for key, sr := range graph { + if len(sr.Requires) == 0 { + result = append(result, key) + } + } + return result +} + +func getLeafs(graph map[string]*ScheduledResource) []string { + var result []string + for key, sr := range graph { + if len(sr.RequiredBy) == 0 { + result = append(result, key) + } + } + return result +} + +func (sched *scheduler) mergeInterimGraphVertices(vertices []interimGraphVertex, graph map[string]*ScheduledResource, + options interfaces.DependencyGraphOptions) { + + for _, entry := range vertices { + key := entry.scheduledResource.Key() + existingSr := graph[key] + if existingSr == nil { + if !options.Silent { + log.Printf("Adding resource %s to the dependency graph flow %s", key, options.FlowName) + } + graph[key] = entry.scheduledResource + } else { + sched.updateContext(existingSr.context, entry.parentContext, entry.dependency) + existingSr.Requires = append(existingSr.Requires, entry.scheduledResource.Requires...) + existingSr.RequiredBy = append(existingSr.RequiredBy, entry.scheduledResource.RequiredBy...) + existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...) + for metaKey, metaValue := range entry.scheduledResource.Meta { + existingSr.Meta[metaKey] = metaValue + } + } } - return nil } // getResourceDestructors builds a list of functions, each of them delete one of replica resources diff --git a/pkg/scheduler/flows_test.go b/pkg/scheduler/flows_test.go index 700e7e0..1bc48aa 100644 --- a/pkg/scheduler/flows_test.go +++ b/pkg/scheduler/flows_test.go @@ -1466,3 +1466,85 @@ func TestDynamicDependencyReplication(t *testing.T) { ensureReplicas(c, t, 7, 1) } + +// TestSequentialReplication tests that resources of sequentially replicated flows create in right order +func TestSequentialReplication(t *testing.T) { + replicaCount := 3 + flow := mocks.MakeFlow("test") + flow.Flow.Sequential = true + + c, fake := mocks.NewClientWithFake( + flow, + mocks.MakeResourceDefinition("pod/ready-$AC_NAME"), + mocks.MakeResourceDefinition("secret/secret"), + mocks.MakeResourceDefinition("job/ready-$AC_NAME"), + mocks.MakeDependency("flow/test", "pod/ready-$AC_NAME", "flow=test"), + mocks.MakeDependency("pod/ready-$AC_NAME", "secret/secret", "flow=test"), + mocks.MakeDependency("secret/secret", "job/ready-$AC_NAME", "flow=test"), + ) + + stopChan := make(chan struct{}) + var deployed []string + fake.PrependReactor("create", "*", + func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + resource := action.GetResource().Resource + if resource != "replica" { + deployed = append(deployed, resource) + } + + return false, nil, nil + }) + + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "test"}) + if err != nil { + t.Fatal(err) + } + + graph := depGraph.(*dependencyGraph).graph + if len(graph) != 3*replicaCount { + t.Error("wrong dependency graph length") + } + + depGraph.Deploy(stopChan) + expected := []string{"pods", "secrets", "jobs", "pods", "jobs", "pods", "jobs"} + if len(deployed) != len(expected) { + t.Fatal("invalid resource sequence", deployed) + } + for i, r := range deployed { + if expected[i] != r { + t.Fatal("invalid resource sequence") + } + } + + ensureReplicas(c, t, replicaCount, replicaCount) +} + +// TestSequentialReplicationWithSharedFlow tests that flow consumed as a resource shared by replicas of +// sequentially replicated flow deployed only once +func TestSequentialReplicationWithSharedFlow(t *testing.T) { + replicaCount := 3 + flow := mocks.MakeFlow("outer") + flow.Flow.Sequential = true + + c := mocks.NewClient( + flow, + mocks.MakeFlow("inner"), + mocks.MakeResourceDefinition("job/ready-a$AC_NAME"), + mocks.MakeResourceDefinition("job/ready-b$AC_NAME"), + mocks.MakeDependency("flow/outer", "flow/inner", "flow=outer"), + mocks.MakeDependency("flow/inner", "job/ready-a$AC_NAME", "flow=outer"), + mocks.MakeDependency("flow/inner", "job/ready-b$AC_NAME", "flow=inner"), + ) + + stopChan := make(chan struct{}) + + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "outer"}) + if err != nil { + t.Fatal(err) + } + + depGraph.Deploy(stopChan) + ensureReplicas(c, t, replicaCount+1, replicaCount+1) +} From 2a0771b1d722c17bfdb1fba44b1023c5a7d1e1dd Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Thu, 25 May 2017 17:50:49 -0700 Subject: [PATCH 3/3] Better handling of deployment cancellation * stopChan is now passed to the graph finalizers so that deployment can be canceled on the final stages * never write to stopChan. The only correct way to cancel deployment is to close the channel * pass nil instead of real chanel for unit tests that do not cancel deployment --- pkg/scheduler/controller.go | 1 - pkg/scheduler/dependency_graph.go | 41 +++++----- pkg/scheduler/flows_test.go | 122 +++++++++++------------------- pkg/scheduler/frontend.go | 9 +-- pkg/scheduler/scheduler.go | 22 ++++-- pkg/scheduler/scheduler_test.go | 10 +-- 6 files changed, 88 insertions(+), 117 deletions(-) diff --git a/pkg/scheduler/controller.go b/pkg/scheduler/controller.go index f92a733..264e398 100644 --- a/pkg/scheduler/controller.go +++ b/pkg/scheduler/controller.go @@ -140,7 +140,6 @@ func deployTasks(taskQueue *list.List, mutex *sync.Mutex, cond *sync.Cond, clien cond.L.Unlock() if processing != nil { if abortChan != nil { - abortChan <- struct{}{} close(abortChan) abortChan = nil processing = nil diff --git a/pkg/scheduler/dependency_graph.go b/pkg/scheduler/dependency_graph.go index 30308bd..634c8dc 100644 --- a/pkg/scheduler/dependency_graph.go +++ b/pkg/scheduler/dependency_graph.go @@ -37,7 +37,7 @@ type dependencyGraph struct { graph map[string]*ScheduledResource scheduler *scheduler graphOptions interfaces.DependencyGraphOptions - finalizer func() + finalizer func(stopChan <-chan struct{}) } type graphContext struct { @@ -963,9 +963,10 @@ func (sched *scheduler) mergeInterimGraphVertices(vertices []interimGraphVertex, } // getResourceDestructors builds a list of functions, each of them delete one of replica resources -func getResourceDestructors(construction, destruction *dependencyGraph, replicaMap map[string]client.Replica, failed *chan *ScheduledResource) []func() bool { - var destructors []func() bool +func getResourceDestructors(construction, destruction *dependencyGraph, replicaMap map[string]client.Replica, + failed *chan *ScheduledResource) []func(<-chan struct{}) bool { + var destructors []func(<-chan struct{}) bool for _, depGraph := range [2]*dependencyGraph{construction, destruction} { for _, resource := range depGraph.graph { resourceCanBeDeleted := true @@ -983,8 +984,8 @@ func getResourceDestructors(construction, destruction *dependencyGraph, replicaM return destructors } -func getDestructorFunc(resource *ScheduledResource, failed *chan *ScheduledResource) func() bool { - return func() bool { +func getDestructorFunc(resource *ScheduledResource, failed *chan *ScheduledResource) func(<-chan struct{}) bool { + return func(<-chan struct{}) bool { res := deleteResource(resource) if res != nil { *failed <- resource @@ -995,10 +996,12 @@ func getDestructorFunc(resource *ScheduledResource, failed *chan *ScheduledResou } // deleteReplicaResources invokes resources destructors and deletes replicas for which 100% of resources were deleted -func deleteReplicaResources(sched *scheduler, destructors []func() bool, replicaMap map[string]client.Replica, failed *chan *ScheduledResource) { +func deleteReplicaResources(sched *scheduler, destructors []func(<-chan struct{}) bool, replicaMap map[string]client.Replica, + failed *chan *ScheduledResource, stopChan <-chan struct{}) { + *failed = make(chan *ScheduledResource, len(destructors)) defer close(*failed) - deleted := runConcurrently(destructors, sched.concurrency) + deleted := runConcurrently(destructors, sched.concurrency, stopChan) failedReplicas := map[string]bool{} if !deleted { log.Println("Some of resources were not deleted") @@ -1015,7 +1018,7 @@ readFailed: break readFailed } } - var deleteReplicaFuncs []func() bool + var deleteReplicaFuncs []func(<-chan struct{}) bool for replicaName, replicaObject := range replicaMap { if _, found := failedReplicas[replicaName]; found { @@ -1023,7 +1026,7 @@ readFailed: } replicaNameCopy := replicaName replicaObjectCopy := replicaObject - deleteReplicaFuncs = append(deleteReplicaFuncs, func() bool { + deleteReplicaFuncs = append(deleteReplicaFuncs, func(<-chan struct{}) bool { log.Printf("%s flow: Deleting replica %s", replicaObjectCopy.FlowName, replicaNameCopy) err := sched.client.Replicas().Delete(replicaObjectCopy.Name) if err != nil { @@ -1033,12 +1036,12 @@ readFailed: }) } - if deleteReplicaFuncs != nil && !runConcurrently(deleteReplicaFuncs, sched.concurrency) { + if deleteReplicaFuncs != nil && !runConcurrently(deleteReplicaFuncs, sched.concurrency, stopChan) { log.Println("Some of flow replicas were not deleted") } } -func (sched *scheduler) composeDeletingFinalizer(construction, destruction *dependencyGraph, replicas []client.Replica) func() { +func (sched *scheduler) composeDeletingFinalizer(construction, destruction *dependencyGraph, replicas []client.Replica) func(<-chan struct{}) { replicaMap := map[string]client.Replica{} for _, replica := range replicas { replicaMap[replica.ReplicaName()] = replica @@ -1047,14 +1050,14 @@ func (sched *scheduler) composeDeletingFinalizer(construction, destruction *depe var failed chan *ScheduledResource destructors := getResourceDestructors(construction, destruction, replicaMap, &failed) - return func() { + return func(stopChan <-chan struct{}) { log.Print("Performing resource cleanup") - deleteReplicaResources(sched, destructors, replicaMap, &failed) + deleteReplicaResources(sched, destructors, replicaMap, &failed, stopChan) } } -func makeAcknowledgeReplicaFunc(replica client.Replica, api client.ReplicasInterface) func() bool { - return func() bool { +func makeAcknowledgeReplicaFunc(replica client.Replica, api client.ReplicasInterface) func(<-chan struct{}) bool { + return func(<-chan struct{}) bool { replica.Deployed = true log.Printf("%s flow: Marking replica %s as deployed", replica.FlowName, replica.ReplicaName()) if err := api.Update(&replica); err != nil { @@ -1065,16 +1068,16 @@ func makeAcknowledgeReplicaFunc(replica client.Replica, api client.ReplicasInter } } -func (sched *scheduler) composeAcknowledgingFinalizer(replicas []client.Replica) func() { - var funcs []func() bool +func (sched *scheduler) composeAcknowledgingFinalizer(replicas []client.Replica) func(<-chan struct{}) { + var funcs []func(<-chan struct{}) bool for _, replica := range replicas { if !replica.Deployed { funcs = append(funcs, makeAcknowledgeReplicaFunc(replica, sched.client.Replicas())) } } - return func() { - if !runConcurrently(funcs, sched.concurrency) { + return func(stopChan <-chan struct{}) { + if !runConcurrently(funcs, sched.concurrency, stopChan) { log.Println("Some of the replicas were not updated!") } } diff --git a/pkg/scheduler/flows_test.go b/pkg/scheduler/flows_test.go index 1bc48aa..5713179 100644 --- a/pkg/scheduler/flows_test.go +++ b/pkg/scheduler/flows_test.go @@ -144,8 +144,7 @@ func TestTriggerFlowIndependently(t *testing.T) { t.Fatal("Job list is not empty") } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err = c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -217,8 +216,7 @@ func TestTriggerOneFlowFromAnother(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -262,8 +260,7 @@ func TestParameterPassing(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -306,8 +303,7 @@ func TestMultipathParameterPassing(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -354,8 +350,7 @@ func TestParametrizedFlow(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -396,8 +391,7 @@ func TestAcNameParameter(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) replicas, err := c.Replicas().List(api.ListOptions{}) if err != nil { @@ -444,8 +438,7 @@ func TestUseUndeclaredFlowParameter(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -538,8 +531,7 @@ func TestParameterPassingBetweenFlows(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -596,8 +588,7 @@ func TestReplication(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, 2*replicaCount, replicaCount) @@ -625,8 +616,7 @@ func TestReplicationWithSharedResources(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, replicaCount+1, replicaCount) @@ -664,8 +654,7 @@ func TestReplicationScaleUp(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, initialReplicaCount, initialReplicaCount) @@ -679,7 +668,7 @@ func TestReplicationScaleUp(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, adjustedReplicaCount, adjustedReplicaCount) @@ -693,7 +682,7 @@ func TestReplicationScaleUp(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, adjustedReplicaCount+replicaCountDelta, adjustedReplicaCount+replicaCountDelta) @@ -718,8 +707,7 @@ func TestNoOpReplication(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) } @@ -749,8 +737,7 @@ func TestCompositionFlowReplication(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, 4*replicaCount, 2*replicaCount) @@ -802,8 +789,7 @@ func TestDestruction(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 6, 5) @@ -812,7 +798,7 @@ func TestDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 4, 3) @@ -821,7 +807,7 @@ func TestDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, 2, 1) if jobs[0].Name != "ready-b" && jobs[1].Name != "ready-b" { @@ -833,7 +819,7 @@ func TestDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) } @@ -859,8 +845,7 @@ func TestCompositeFlowDestruction(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 15, 10) @@ -869,7 +854,7 @@ func TestCompositeFlowDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 9, 6) @@ -878,7 +863,7 @@ func TestCompositeFlowDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 3, 2) @@ -887,7 +872,7 @@ func TestCompositeFlowDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) } @@ -944,8 +929,7 @@ func TestCleanupResources(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 1) if !(aCreated && !aDeleted && !bCreated && !bDeleted && !cCreated && !cDeleted) { @@ -958,7 +942,7 @@ func TestCleanupResources(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) if !(aCreated && aDeleted && bCreated && bDeleted && cCreated && cDeleted) { @@ -991,8 +975,7 @@ func TestSharedReplicaSpace(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 2, 1) @@ -1002,7 +985,7 @@ func TestSharedReplicaSpace(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) @@ -1012,7 +995,7 @@ func TestSharedReplicaSpace(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 2, 1) @@ -1022,7 +1005,7 @@ func TestSharedReplicaSpace(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) } @@ -1043,8 +1026,7 @@ func TestDeleteExistingResources(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 1) @@ -1053,7 +1035,7 @@ func TestDeleteExistingResources(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 0, "invalid attempt to delete external resources") @@ -1063,7 +1045,7 @@ func TestDeleteExistingResources(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 1) @@ -1072,7 +1054,7 @@ func TestDeleteExistingResources(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0, "valid attempt to delete external resources") } @@ -1103,8 +1085,7 @@ func TestCleanupResourcesErrorHandling(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 2, 2) @@ -1114,7 +1095,7 @@ func TestCleanupResourcesErrorHandling(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 1) } @@ -1137,7 +1118,7 @@ func TestDeploymentRecoveryForRelativeReplicaCount(t *testing.T) { func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { if !prevented { prevented = true - stopChan <- struct{}{} + close(stopChan) return true, nil, errors.New("resource cannot be created") } return false, nil, nil @@ -1166,7 +1147,7 @@ func TestDeploymentRecoveryForRelativeReplicaCount(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 2) replicas, _ = c.Replicas().List(api.ListOptions{}) for _, r := range replicas.Items { @@ -1194,9 +1175,7 @@ func TestMultiParentFlow(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) - + depGraph.Deploy(nil) ensureReplicas(c, t, 5, 3) } @@ -1219,8 +1198,7 @@ func TestMultiParentFlowWithSuffix(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 6, 4) } @@ -1254,8 +1232,7 @@ func TestMultipleFlowCalls(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, 4, 3) jobNames := map[string]bool{ @@ -1304,8 +1281,7 @@ func TestMultipathParameterPassingWithSuffix(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -1386,8 +1362,7 @@ func TestConsumeReplicatedFlow(t *testing.T) { if err != nil { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 2*3, 3*2+2) } @@ -1410,8 +1385,7 @@ func TestComplexDependencyReplication(t *testing.T) { if err != nil { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) expectedJobNames := map[string]bool{ "ready-1-a": true, @@ -1461,8 +1435,7 @@ func TestDynamicDependencyReplication(t *testing.T) { if err != nil { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 7, 1) } @@ -1483,7 +1456,6 @@ func TestSequentialReplication(t *testing.T) { mocks.MakeDependency("secret/secret", "job/ready-$AC_NAME", "flow=test"), ) - stopChan := make(chan struct{}) var deployed []string fake.PrependReactor("create", "*", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { @@ -1506,7 +1478,7 @@ func TestSequentialReplication(t *testing.T) { t.Error("wrong dependency graph length") } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) expected := []string{"pods", "secrets", "jobs", "pods", "jobs", "pods", "jobs"} if len(deployed) != len(expected) { t.Fatal("invalid resource sequence", deployed) @@ -1537,14 +1509,12 @@ func TestSequentialReplicationWithSharedFlow(t *testing.T) { mocks.MakeDependency("flow/inner", "job/ready-b$AC_NAME", "flow=inner"), ) - stopChan := make(chan struct{}) - depGraph, err := New(c, nil, 0).BuildDependencyGraph( interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "outer"}) if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, replicaCount+1, replicaCount+1) } diff --git a/pkg/scheduler/frontend.go b/pkg/scheduler/frontend.go index dffcd92..d49973b 100644 --- a/pkg/scheduler/frontend.go +++ b/pkg/scheduler/frontend.go @@ -161,14 +161,7 @@ func Deploy(sched interfaces.Scheduler, options interfaces.DependencyGraphOption if err != nil { return "", err } - var ch <-chan struct{} - if stopChan == nil { - ch := make(chan struct{}) - defer close(ch) - } else { - ch = stopChan - } - depGraph.Deploy(ch) + depGraph.Deploy(stopChan) } else { log.Printf("Scheduling deployment of %s flow", options.FlowName) var err error diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 43e0031..2970aee 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -353,7 +353,7 @@ func (depGraph dependencyGraph) Deploy(stopChan <-chan struct{}) { } } if depGraph.finalizer != nil { - depGraph.finalizer() + depGraph.finalizer(stopChan) } // TODO Make sure every KO gets created eventually } @@ -418,7 +418,7 @@ func (depGraph dependencyGraph) GetStatus() (interfaces.DeploymentStatus, interf return status, deploymentReport } -func runConcurrently(funcs []func() bool, concurrency int) bool { +func runConcurrently(funcs []func(<-chan struct{}) bool, concurrency int, stopChan <-chan struct{}) bool { if concurrency < 1 { concurrency = len(funcs) } @@ -430,12 +430,18 @@ func runConcurrently(funcs []func() bool, concurrency int) bool { for _, f := range funcs { sem <- true - go func(foo func() bool) { - defer func() { <-sem }() - if !foo() { - result = false - } - }(f) + select { + case <-stopChan: + result = false + <-sem + default: + go func(foo func(<-chan struct{}) bool) { + defer func() { <-sem }() + if !foo(stopChan) { + result = false + } + }(f) + } } for i := 0; i < cap(sem); i++ { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 2176b3f..f4117ad 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -546,18 +546,18 @@ func TestGraph(t *testing.T) { } } -func makeTaskFunc(i int32, res bool, acc *int32) func() bool { - return func() bool { +func makeTaskFunc(i int32, res bool, acc *int32) func(<-chan struct{}) bool { + return func(<-chan struct{}) bool { time.Sleep(time.Second / 10) atomic.AddInt32(acc, i) return res } } -func runTaskFuncs(t *testing.T, funcs []func() bool, concurrency int, expectedSum int32, expectedResult bool, threshold float64, acc *int32) { +func runTaskFuncs(t *testing.T, funcs []func(<-chan struct{}) bool, concurrency int, expectedSum int32, expectedResult bool, threshold float64, acc *int32) { *acc = 0 before := time.Now() - res := runConcurrently(funcs, concurrency) + res := runConcurrently(funcs, concurrency, nil) after := time.Now() if *acc != expectedSum { t.Errorf("runConcurrently failed: %d != %d", expectedSum, acc) @@ -576,7 +576,7 @@ func runTaskFuncs(t *testing.T, funcs []func() bool, concurrency int, expectedSu // TestRunConcurrently tests runConcurrently function which runs list of concurrent tasks func TestRunConcurrently(t *testing.T) { var acc int32 - var funcs []func() bool + var funcs []func(<-chan struct{}) bool for i := int32(1); i <= 50; i++ { funcs = append(funcs, makeTaskFunc(i, true, &acc)) }