diff --git a/docs/flows.md b/docs/flows.md index 9d42583..eb53890 100644 --- a/docs/flows.md +++ b/docs/flows.md @@ -63,6 +63,7 @@ destruction: replicaSpace: optional-name exported: true +sequential: true parameters: parameterName1: @@ -195,7 +196,7 @@ my-flow -> -> pod/pod-$arg ``` will create two pods: `pod-a` and `pod-b`. -## Replication of flows +## Replication Flow replication is an AppController feature that makes specified number of flow graph copies, each one with a unique name and then merges them into a single graph. Because each replica name may be used in some of resource @@ -234,6 +235,96 @@ If there were 7 of them, 4 replicas would be deleted.\ `kubeac run my-flow` if there are no replicas exist, create one, otherwise validate status of resources of existing replicas. +### Replication of dependencies + +With commandline parameters one can create number of flow replicas. But sometimes there is a need to have flow +that creates several replicas of another flow, or just several resources with the same specification that differ +only in name. + +One possible solution is to utilize technique shown above: make parameter value be part of resource name and +then duplicate the dependency that leads to this resource and pass different parameter value along each of +dependencies. This works well for small and fixed number of replicas. But if the number goes big, it becomes hard +to manage such number of dependency objects. Moreover if the number itself is not fixed but rather passed as a +parameter replicating resource by manual replication of dependencies becomes impossible. + +Luckily, the dependencies can be automatically replicated. This is done through the `generateFor` field of the +`Dependency` object. `generateFor` is a map where keys are argument names and values are list expressions. Each list +expression is comma-separated list of values. If the value has a form of `number..number`, it is expended into a +list of integers in the given range. For example `"1..3, 10..11, abc"` will turn into `["1", "2", "3", "10", "11", "abc"]`. +Then the dependency is going to be replicated automatically with each replica getting on of the list values as an +additional argument. There can be several `generateFor` arguments. In this case there is going to be one dependency +for each combination of the list values. For example, + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency +parent: pod/podName +child: flow/flowName-$x-$y +generateFor: + x: 1..2 + y: a, b +``` + +has the same effect as + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency1 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 1 + y: a +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency2 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 2 + y: a +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency3 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 1 + y: b +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency4 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 2 + y: b +``` + +Besides simplifying the dependency graph, dependency replication makes possible to have dynamic number of replicas +by using parameter value right inside the list expressions: + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency +parent: pod/podName +child: flow/flowName-$index +generateFor: + index: 1..$replicaCount +``` + ### Replica-spaces and contexts Replica-space, is a tag that all replicas of the flow share. When new `Replica` object for the flow is created, @@ -261,6 +352,14 @@ another flow will "see" only its own replicas so the `Flow` resource can always However, when the flow is run independently, it will not have any context and thus query replicas based on replica-space alone, which means it will get all the replicas from all contexts. +### Sequential flows + +By default, if flow has more than one replica, generated dependency graph would have each replica subgraph attached +to the graph root vertex (the `Flow` vertex). When deployed, resources of all replicas are going to be created in +parallel. However, in some cases it is desired that replicas be deployed sequentially, one by one. This can be achieved +by setting `sequential` attribute of the `Flow` to `true`. For sequential flows each replica roots get attached to the +leaf vertices of previous one. + ## Scheduling flow deployments When user runs `kubeac run something` the deployment does not happen immediately (unless there is also a `--deploy` diff --git a/e2e/basic_test.go b/e2e/basic_test.go index c039163..2b3cfc1 100644 --- a/e2e/basic_test.go +++ b/e2e/basic_test.go @@ -56,7 +56,9 @@ var _ = Describe("Basic Suite", func() { }, } childPod := PodPause("child-pod") - framework.Connect(framework.WrapAndCreate(parentPod), framework.WrapAndCreate(childPod)) + framework.Connect( + framework.WrapWithMetaAndCreate(parentPod, map[string]interface{}{"timeout": 30}), + framework.WrapAndCreate(childPod)) framework.Run() testutils.WaitForPod(framework.Clientset, framework.Namespace.Name, parentPod.Name, "") time.Sleep(time.Second) @@ -104,7 +106,7 @@ var _ = Describe("Basic Suite", func() { By("Creating resource definition with single pod") pod1 := PodPause("pod1") framework.WrapAndCreate(pod1) - framework.Run() + framework.RunAsynchronously() framework.DeleteAppControllerPod() By("Verify that pod is consistently not found") Consistently(func() bool { diff --git a/e2e/example_runner.go b/e2e/example_runner.go index 06001a5..8f98139 100644 --- a/e2e/example_runner.go +++ b/e2e/example_runner.go @@ -104,14 +104,10 @@ func (f *examplesFramework) handleListCreation(ustList *runtime.UnstructuredList } } -func (f *examplesFramework) VerifyStatus(task string, options interfaces.DependencyGraphOptions) { +func (f *examplesFramework) VerifyStatus(options interfaces.DependencyGraphOptions) { var depReport report.DeploymentReport Eventually( func() bool { - _, err := f.Client.ConfigMaps().Get(task) - if err == nil { - return false - } status, r, err := scheduler.GetStatus(f.Client, nil, options) if err != nil { return false @@ -127,7 +123,7 @@ func (f *examplesFramework) CreateRunAndVerify(exampleName string, options inter By("Creating example " + exampleName) f.CreateExample(exampleName) By("Running appcontroller scheduler") - task := f.RunWithOptions(options) + f.RunWithOptions(options) By("Verifying status of deployment for example " + exampleName) - f.VerifyStatus(task, options) + f.VerifyStatus(options) } diff --git a/e2e/flows_test.go b/e2e/flows_test.go index 72ab376..d0f84fd 100644 --- a/e2e/flows_test.go +++ b/e2e/flows_test.go @@ -64,9 +64,9 @@ var _ = Describe("Flows Suite", func() { deleteOptions := interfaces.DependencyGraphOptions{ReplicaCount: 0, FixedNumberOfReplicas: true} By("Running appcontroller scheduler") - task := framework.RunWithOptions(deleteOptions) + framework.RunWithOptions(deleteOptions) By("Verifying status of deployment") - framework.VerifyStatus(task, deleteOptions) + framework.VerifyStatus(deleteOptions) framework.validateResourceCounts([]resourceCount{ {"replicas", "", false, 0}, diff --git a/e2e/utils/appcmanager.go b/e2e/utils/appcmanager.go index 266faf1..1e82e80 100644 --- a/e2e/utils/appcmanager.go +++ b/e2e/utils/appcmanager.go @@ -44,17 +44,38 @@ type AppControllerManager struct { } // Run runs dependency graph deployment with default settings -func (a *AppControllerManager) Run() string { - return a.RunWithOptions(interfaces.DependencyGraphOptions{MinReplicaCount: 1}) +func (a *AppControllerManager) Run() { + a.runDeployment(false, interfaces.DependencyGraphOptions{MinReplicaCount: 1}) +} + +// RunAsynchronously runs dependency graph deployment with default settings without waiting for deployment to complete +func (a *AppControllerManager) RunAsynchronously() { + a.runDeployment(true, interfaces.DependencyGraphOptions{MinReplicaCount: 1}) } // RunWithOptions runs dependency graph deployment with given settings -func (a *AppControllerManager) RunWithOptions(options interfaces.DependencyGraphOptions) string { +func (a *AppControllerManager) RunWithOptions(options interfaces.DependencyGraphOptions) { + a.runDeployment(false, options) +} + +// RunAsynchronouslyWithOptions runs dependency graph deployment with given settings without waiting for deployment to complete +func (a *AppControllerManager) RunAsynchronouslyWithOptions(options interfaces.DependencyGraphOptions) { + a.runDeployment(true, options) +} + +func (a *AppControllerManager) runDeployment(runAsync bool, options interfaces.DependencyGraphOptions) { sched := scheduler.New(a.Client, nil, 0) task, err := scheduler.Deploy(sched, options, false, nil) Expect(err).NotTo(HaveOccurred()) - return task + if !runAsync { + Eventually( + func() error { + _, err := a.Client.ConfigMaps().Get(task) + return err + }, + 300*time.Second, 5*time.Second).Should(HaveOccurred(), "Deployment job wasn't completed") + } } // DeleteAppControllerPod deletes pod, where AppController is running diff --git a/examples/etcd/README.md b/examples/etcd/README.md index cd945de..22fe40c 100644 --- a/examples/etcd/README.md +++ b/examples/etcd/README.md @@ -26,8 +26,7 @@ If omitted, `etcd` name is used by default. `kubectl exec k8s-appcontroller kubeac run etcd-scale -n +1 --arg clusterName=my-cluster` `-n +1` - adds one node to the cluster. Use `-n -1` to scale the cluster down by one node. In this case the last -added node is going to be deleted. At the moment it is only possible to scale cluster up by one node at a time. -However, any number of nodes can be removed. Note, that this can also remove nodes created upon initial deployment. +added node is going to be deleted. This flow can also remove nodes created upon initial deployment. `--arg clusterName=my-cluster` - name of the cluster to scale (`etcd` if not specified). diff --git a/examples/etcd/resdefs/scale-flow.yaml b/examples/etcd/resdefs/scale-flow.yaml index 4394b4b..4fee091 100644 --- a/examples/etcd/resdefs/scale-flow.yaml +++ b/examples/etcd/resdefs/scale-flow.yaml @@ -4,6 +4,8 @@ metadata: name: etcd-scale exported: true +sequential: true + construction: flow: etcd-scale destruction: diff --git a/pkg/client/dependencies.go b/pkg/client/dependencies.go index 075b48d..5b72158 100644 --- a/pkg/client/dependencies.go +++ b/pkg/client/dependencies.go @@ -41,6 +41,9 @@ type Dependency struct { // Arguments passed to dependent resource Args map[string]string `json:"args,omitempty"` + + // map of variable name -> list expression. New dependencies are generated by replication and iteration over those lists + GenerateFor map[string]string `json:"generateFor,omitempty"` } // DependencyList is a k8s object representing list of dependencies diff --git a/pkg/client/flows.go b/pkg/client/flows.go index 04b4251..55df049 100644 --- a/pkg/client/flows.go +++ b/pkg/client/flows.go @@ -46,6 +46,9 @@ type Flow struct { // can only be triggered by other flows (including DEFAULT flow which is exported by-default) Exported bool `json:"exported,omitempty"` + // Flow replicas must be deployed sequentially, one by one + Sequential bool `json:"sequential,omitempty"` + // Parameters that the flow can accept (i.e. valid inputs for the flow) Parameters map[string]FlowParameter `json:"parameters,omitempty"` diff --git a/pkg/interfaces/interfaces.go b/pkg/interfaces/interfaces.go index 384c010..c470831 100644 --- a/pkg/interfaces/interfaces.go +++ b/pkg/interfaces/interfaces.go @@ -72,7 +72,7 @@ type DeploymentReport interface { // DependencyGraph represents operations on dependency graph type DependencyGraph interface { GetStatus() (DeploymentStatus, DeploymentReport) - Deploy(<-chan struct{}) + Deploy(<-chan struct{}) bool Options() DependencyGraphOptions } @@ -82,7 +82,6 @@ type GraphContext interface { Scheduler() Scheduler GetArg(string) string Graph() DependencyGraph - Dependency() *client.Dependency } // DependencyGraphOptions contains all the input required to build a dependency graph diff --git a/pkg/resources/flow.go b/pkg/resources/flow.go index 564801f..9b11f84 100644 --- a/pkg/resources/flow.go +++ b/pkg/resources/flow.go @@ -15,9 +15,7 @@ package resources import ( - "fmt" "log" - "strings" "github.com/Mirantis/k8s-AppController/pkg/client" "github.com/Mirantis/k8s-AppController/pkg/interfaces" @@ -29,7 +27,6 @@ type flow struct { flow *client.Flow context interfaces.GraphContext originalName string - instanceName string currentGraph interfaces.DependencyGraph } @@ -52,19 +49,12 @@ func (flowTemplateFactory) Kind() string { func (flowTemplateFactory) New(def client.ResourceDefinition, c client.Interface, gc interfaces.GraphContext) interfaces.Resource { newFlow := parametrizeResource(def.Flow, gc, []string{"*"}).(*client.Flow) - dep := gc.Dependency() - var depName string - if dep != nil { - depName = strings.Replace(dep.Name, dep.GenerateName, "", 1) - } - return report.SimpleReporter{ BaseResource: &flow{ Base: Base{def.Meta}, flow: newFlow, context: gc, originalName: def.Flow.Name, - instanceName: fmt.Sprintf("%s%s", depName, gc.GetArg("AC_NAME")), }} } @@ -88,20 +78,13 @@ func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.D args[arg] = val } } - fixedNumberOfReplicas := false - if replicaCount > 0 { - fixedNumberOfReplicas = f.context.Graph().Options().FixedNumberOfReplicas - } else if replicaCount == 0 { - fixedNumberOfReplicas = true - replicaCount = -1 - } options := interfaces.DependencyGraphOptions{ FlowName: f.originalName, Args: args, - FlowInstanceName: f.instanceName, + FlowInstanceName: f.context.GetArg("AC_ID"), ReplicaCount: replicaCount, Silent: silent, - FixedNumberOfReplicas: fixedNumberOfReplicas, + FixedNumberOfReplicas: true, } graph, err := f.context.Scheduler().BuildDependencyGraph(options) @@ -141,7 +124,7 @@ func (f *flow) Create() error { // Delete is called during dlow destruction which can happen only once while Create ensures that at least one flow // replica exists, and as such can be called any number of times func (f flow) Delete() error { - graph, err := f.buildDependencyGraph(-1, false) + graph, err := f.buildDependencyGraph(0, false) if err != nil { return err } @@ -155,7 +138,7 @@ func (f flow) Status(meta map[string]string) (interfaces.ResourceStatus, error) graph := f.currentGraph if graph == nil { var err error - graph, err = f.buildDependencyGraph(0, true) + graph, err = f.buildDependencyGraph(-1, true) if err != nil { return interfaces.ResourceError, err } diff --git a/pkg/scheduler/controller.go b/pkg/scheduler/controller.go index f92a733..264e398 100644 --- a/pkg/scheduler/controller.go +++ b/pkg/scheduler/controller.go @@ -140,7 +140,6 @@ func deployTasks(taskQueue *list.List, mutex *sync.Mutex, cond *sync.Cond, clien cond.L.Unlock() if processing != nil { if abortChan != nil { - abortChan <- struct{}{} close(abortChan) abortChan = nil processing = nil diff --git a/pkg/scheduler/dependency_graph.go b/pkg/scheduler/dependency_graph.go index 8dab597..c093c0e 100644 --- a/pkg/scheduler/dependency_graph.go +++ b/pkg/scheduler/dependency_graph.go @@ -19,6 +19,7 @@ import ( "fmt" "log" "sort" + "strconv" "strings" "time" @@ -33,19 +34,19 @@ import ( ) type dependencyGraph struct { - graph map[string]*ScheduledResource + graph map[string]*scheduledResource scheduler *scheduler graphOptions interfaces.DependencyGraphOptions - finalizer func() + finalizer func(stopChan <-chan struct{}) } type graphContext struct { - args map[string]string - graph *dependencyGraph - scheduler *scheduler - flow *client.Flow - dependency *client.Dependency - replica string + args map[string]string + graph *dependencyGraph + scheduler *scheduler + flow *client.Flow + id string + replica string } var _ interfaces.GraphContext = &graphContext{} @@ -62,6 +63,8 @@ func (gc graphContext) GetArg(name string) string { return gc.replica case "AC_FLOW_NAME": return gc.flow.Name + case "AC_ID": + return gc.id default: val, ok := gc.args[name] if ok { @@ -84,21 +87,16 @@ func (gc graphContext) Graph() interfaces.DependencyGraph { return gc.graph } -// Dependency returns Dependency for which child is the resource being created with this context -func (gc graphContext) Dependency() *client.Dependency { - return gc.dependency -} - // newScheduledResourceFor returns new scheduled resource for given resource in init state -func newScheduledResourceFor(r interfaces.Resource, suffix string, context *graphContext, existing bool) *ScheduledResource { - return &ScheduledResource{ - Started: false, - Ignored: false, - Error: nil, +func newScheduledResourceFor(r interfaces.Resource, suffix string, context *graphContext, existing bool) *scheduledResource { + return &scheduledResource{ + started: false, + ignored: false, + error: nil, Resource: r, - Meta: map[string]map[string]string{}, + meta: map[string]map[string]string{}, context: context, - Existing: existing, + existing: existing, suffix: copier.EvaluateString(suffix, getArgFunc(context)), } } @@ -125,7 +123,13 @@ func (d *sortableDependencyList) Swap(i, j int) { d.Items[i], d.Items[j] = d.Items[j], d.Items[i] } -func (sched *scheduler) getDependencies() ([]client.Dependency, error) { +func (sched *scheduler) getDependencies(silent bool) ([]client.Dependency, error) { + if sched.dependencyCache != nil { + return sched.dependencyCache, nil + } + if !silent { + log.Println("Getting dependencies") + } depList, err := sched.client.Dependencies().List(api.ListOptions{LabelSelector: sched.selector}) if err != nil { return nil, err @@ -133,6 +137,7 @@ func (sched *scheduler) getDependencies() ([]client.Dependency, error) { sortableDepList := sortableDependencyList(*depList) sort.Stable(&sortableDepList) + sched.dependencyCache = sortableDepList.Items return sortableDepList.Items, nil } @@ -159,7 +164,9 @@ func groupDependencies(dependencies []client.Dependency, defaultFlow = []client.Dependency{} addResource := func(name string) { if !strings.HasPrefix(name, "flow/") && !isDependant[name] { - defaultFlow = append(defaultFlow, client.Dependency{Parent: defaultFlowName, Child: name}) + dep := client.Dependency{Parent: defaultFlowName, Child: name} + dep.Name = name + defaultFlow = append(defaultFlow, dep) isDependant[name] = true } } @@ -184,7 +191,13 @@ func getResourceName(resourceDefinition client.ResourceDefinition) (string, stri return "", "" } -func (sched *scheduler) getResourceDefinitions() (map[string]client.ResourceDefinition, error) { +func (sched *scheduler) getResourceDefinitions(silent bool) (map[string]client.ResourceDefinition, error) { + if sched.resDefsCache != nil { + return sched.resDefsCache, nil + } + if !silent { + log.Println("Getting resource definitions") + } resDefList, err := sched.client.ResourceDefinitions().List(api.ListOptions{LabelSelector: sched.selector}) if err != nil { return nil, err @@ -197,6 +210,7 @@ func (sched *scheduler) getResourceDefinitions() (map[string]client.ResourceDefi } result[kind+"/"+name] = resDef } + sched.resDefsCache = result return result, nil } @@ -254,7 +268,7 @@ func isMapContainedIn(contained, containing map[string]string) bool { // newScheduledResource is a constructor for ScheduledResource func (sched scheduler) newScheduledResource(kind, name, suffix string, resDefs map[string]client.ResourceDefinition, - gc *graphContext, silent bool) (*ScheduledResource, error) { + gc *graphContext, silent bool) (*scheduledResource, error) { var r interfaces.Resource resourceTemplate, ok := resources.KindToResourceTemplate[kind] @@ -310,7 +324,7 @@ func keyParts(key string) (kind, name, suffix string, err error) { func newDependencyGraph(sched *scheduler, options interfaces.DependencyGraphOptions) *dependencyGraph { return &dependencyGraph{ - graph: make(map[string]*ScheduledResource), + graph: make(map[string]*scheduledResource), scheduler: sched, graphOptions: options, } @@ -328,11 +342,11 @@ func getArgFunc(gc interfaces.GraphContext) func(string) string { func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *client.Dependency, replica string) *graphContext { context := &graphContext{ - scheduler: sched, - graph: parentContext.graph, - flow: parentContext.flow, - replica: replica, - dependency: dependency, + scheduler: sched, + graph: parentContext.graph, + flow: parentContext.flow, + replica: replica, + id: getVertexID(dependency, replica), } context.args = make(map[string]string) @@ -344,6 +358,15 @@ func (sched *scheduler) prepareContext(parentContext *graphContext, dependency * return context } +func getVertexID(dependency *client.Dependency, replica string) string { + var depName string + if dependency != nil { + depName = strings.Replace(dependency.Name, dependency.GenerateName, "", 1) + } + depName += replica + return depName +} + func (sched *scheduler) updateContext(context, parentContext *graphContext, dependency client.Dependency) { for key, value := range dependency.Args { context.args[key] = copier.EvaluateString(value, parentContext.GetArg) @@ -558,10 +581,7 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO options.FlowName = interfaces.DefaultFlowName } - if !options.Silent { - log.Println("Getting resource definitions") - } - resDefs, err := sched.getResourceDefinitions() + resDefs, err := sched.getResourceDefinitions(options.Silent) if err != nil { return nil, err } @@ -586,10 +606,7 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO return nil, err } - if !options.Silent { - log.Println("Getting dependencies") - } - depList, err := sched.getDependencies() + depList, err := sched.getDependencies(options.Silent) if err != nil { return nil, err } @@ -597,8 +614,11 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO if !options.Silent { log.Println("Making sure there is no cycles in the dependency graph") } - if err = EnsureNoCycles(depList, resDefs); err != nil { - return nil, err + if !sched.graphHasNoCycles { + if err = EnsureNoCycles(depList, resDefs); err != nil { + return nil, err + } + sched.graphHasNoCycles = true } dependencies := groupDependencies(depList, resDefs) @@ -626,8 +646,8 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO } for _, value := range depGraph.graph { - value.RequiredBy = unique(value.RequiredBy) - value.Requires = unique(value.Requires) + value.requiredBy = unique(value.requiredBy) + value.requires = unique(value.requires) value.usedInReplicas = unique(value.usedInReplicas) } @@ -661,29 +681,130 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO return depGraph, nil } +func listDependencies(dependencies map[string][]client.Dependency, parent string, flow *client.Flow, + useDestructionSelector bool, context *graphContext) []client.Dependency { + + deps := filterDependencies(dependencies, parent, flow, useDestructionSelector) + var result []client.Dependency + for _, dep := range deps { + if len(dep.GenerateFor) == 0 { + result = append(result, dep) + continue + } + + var keys []string + for k := range dep.GenerateFor { + keys = append(keys, k) + } + sort.Strings(keys) + lists := make([][]string, len(dep.GenerateFor)) + for i, key := range keys { + lists[i] = expandListExpression(copier.EvaluateString(dep.GenerateFor[key], getArgFunc(context))) + } + for n, combination := range permute(lists) { + newArgs := make(map[string]string, len(dep.Args)+len(keys)) + for k, v := range dep.Args { + newArgs[k] = v + } + for i, key := range keys { + newArgs[key] = combination[i] + } + depCopy := dep + depCopy.Args = newArgs + depCopy.Name += strconv.Itoa(n + 1) + result = append(result, depCopy) + } + } + return result +} + +func permute(variants [][]string) [][]string { + switch len(variants) { + case 0: + return variants + case 1: + var result [][]string + for _, v := range variants[0] { + result = append(result, []string{v}) + } + return result + default: + var result [][]string + for _, tail := range variants[len(variants)-1] { + for _, p := range permute(variants[:len(variants)-1]) { + result = append(result, append(p, tail)) + } + } + return result + } +} + +func expandListExpression(expr string) []string { + var result []string + for _, part := range strings.Split(expr, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + + isRange := true + var from, to int + + rangeParts := strings.SplitN(part, "..", 2) + if len(rangeParts) != 2 { + isRange = false + } + + var err error + if isRange { + from, err = strconv.Atoi(rangeParts[0]) + if err != nil { + isRange = false + } + } + if isRange { + to, err = strconv.Atoi(rangeParts[1]) + if err != nil { + isRange = false + } + } + + if isRange { + for i := from; i <= to; i++ { + result = append(result, strconv.Itoa(i)) + } + } else { + result = append(result, part) + } + } + return result +} + +type interimGraphVertex struct { + dependency client.Dependency + scheduledResource *scheduledResource + parentContext *graphContext +} + func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, resDefs map[string]client.ResourceDefinition, dependencies map[string][]client.Dependency, flow *client.Flow, replicas []client.Replica, useDestructionSelector bool) error { - type Block struct { - dependency client.Dependency - scheduledResource *ScheduledResource - parentContext *graphContext - } - blocks := map[string][]*Block{} + var vertices [][]interimGraphVertex silent := rootContext.graph.Options().Silent for _, replica := range replicas { + var replicaVertices []interimGraphVertex replicaName := replica.ReplicaName() replicaContext := sched.prepareContext(rootContext, nil, replicaName) queue := list.New() - queue.PushFront(&Block{dependency: client.Dependency{Child: "flow/" + flow.Name}}) + queue.PushFront(interimGraphVertex{dependency: client.Dependency{Child: "flow/" + flow.Name}}) for e := queue.Front(); e != nil; e = e.Next() { - parent := e.Value.(*Block) + parent := e.Value.(interimGraphVertex) - deps := filterDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector) + deps := listDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector, replicaContext) for _, dep := range deps { if parent.scheduledResource != nil && strings.HasPrefix(parent.scheduledResource.Key(), "flow/") { @@ -707,50 +828,156 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, } sr.usedInReplicas = []string{replicaName} - block := &Block{ + vertex := interimGraphVertex{ scheduledResource: sr, dependency: dep, parentContext: parentContext, } - - blocks[dep.Child] = append(blocks[dep.Child], block) + replicaVertices = append(replicaVertices, vertex) if parent.scheduledResource != nil { - sr.Requires = append(sr.Requires, parent.scheduledResource.Key()) - parent.scheduledResource.RequiredBy = append(parent.scheduledResource.RequiredBy, sr.Key()) - sr.Meta[parent.dependency.Child] = dep.Meta + sr.requires = append(sr.requires, parent.scheduledResource.Key()) + parent.scheduledResource.requiredBy = append(parent.scheduledResource.requiredBy, sr.Key()) + sr.meta[parent.dependency.Child] = dep.Meta } - queue.PushBack(block) + queue.PushBack(vertex) } } - for _, block := range blocks { - for _, entry := range block { - key := entry.scheduledResource.Key() - existingSr := rootContext.graph.graph[key] - if existingSr == nil { - if !silent { - log.Printf("Adding resource %s to the dependency graph flow %s", key, flow.Name) - } - rootContext.graph.graph[key] = entry.scheduledResource - } else { - sched.updateContext(existingSr.context, entry.parentContext, entry.dependency) - existingSr.Requires = append(existingSr.Requires, entry.scheduledResource.Requires...) - existingSr.RequiredBy = append(existingSr.RequiredBy, entry.scheduledResource.RequiredBy...) - existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...) - for metaKey, metaValue := range entry.scheduledResource.Meta { - existingSr.Meta[metaKey] = metaValue - } + vertices = append(vertices, replicaVertices) + } + + if flow.Sequential { + sched.concatenateReplicas(vertices, rootContext, rootContext.graph.Options()) + } else { + sched.mergeReplicas(vertices, rootContext, rootContext.graph.Options()) + } + return nil +} + +func (sched *scheduler) mergeReplicas(vertices [][]interimGraphVertex, gc *graphContext, + options interfaces.DependencyGraphOptions) { + + for _, replicaVertices := range vertices { + sched.mergeInterimGraphVertices(replicaVertices, gc.graph.graph, options) + } +} + +func (sched *scheduler) concatenateReplicas(vertices [][]interimGraphVertex, gc *graphContext, + options interfaces.DependencyGraphOptions) { + graph := gc.graph.graph + var previousReplicaGraph map[string]*scheduledResource + for i, replicaVertices := range vertices { + replicaGraph := map[string]*scheduledResource{} + sched.mergeInterimGraphVertices(replicaVertices, replicaGraph, options) + + if i > 0 { + correctDuplicateResources(graph, replicaGraph, i) + + for _, leafName := range getLeafs(previousReplicaGraph) { + for _, rootName := range getRoots(replicaGraph) { + root := replicaGraph[rootName] + leaf := previousReplicaGraph[leafName] + root.requires = append(root.requires, leafName) + leaf.requiredBy = append(leaf.requiredBy, rootName) } } } + previousReplicaGraph = replicaGraph + for key, value := range replicaGraph { + graph[key] = value + } + } +} + +func correctDuplicateResources(existingGraph, newGraph map[string]*scheduledResource, index int) { + toReplace := map[string]*scheduledResource{} + for key, sr := range newGraph { + if existingGraph[key] != nil { + toReplace[key] = sr + } + } + for key, sr := range toReplace { + sr.context.id = existingGraph[key].context.id + j := index + 1 + suffix := sr.suffix + for { + sr.suffix = fmt.Sprintf("%s #%d", suffix, j) + if existingGraph[sr.Key()] == nil { + break + } + j++ + } + for _, rKey := range sr.requiredBy { + requires := newGraph[rKey].requires + for i, rKey2 := range requires { + if rKey2 == key { + requires[i] = sr.Key() + break + } + } + } + for _, rKey := range sr.requires { + requiredBy := newGraph[rKey].requiredBy + for i, rKey2 := range requiredBy { + if rKey2 == key { + requiredBy[i] = sr.Key() + break + } + } + } + delete(newGraph, key) + newGraph[sr.Key()] = sr + } +} + +func getRoots(graph map[string]*scheduledResource) []string { + var result []string + for key, sr := range graph { + if len(sr.requires) == 0 { + result = append(result, key) + } + } + return result +} + +func getLeafs(graph map[string]*scheduledResource) []string { + var result []string + for key, sr := range graph { + if len(sr.requiredBy) == 0 { + result = append(result, key) + } + } + return result +} + +func (sched *scheduler) mergeInterimGraphVertices(vertices []interimGraphVertex, graph map[string]*scheduledResource, + options interfaces.DependencyGraphOptions) { + + for _, entry := range vertices { + key := entry.scheduledResource.Key() + existingSr := graph[key] + if existingSr == nil { + if !options.Silent { + log.Printf("Adding resource %s to the dependency graph flow %s", key, options.FlowName) + } + graph[key] = entry.scheduledResource + } else { + sched.updateContext(existingSr.context, entry.parentContext, entry.dependency) + existingSr.requires = append(existingSr.requires, entry.scheduledResource.requires...) + existingSr.requiredBy = append(existingSr.requiredBy, entry.scheduledResource.requiredBy...) + existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...) + for metaKey, metaValue := range entry.scheduledResource.meta { + existingSr.meta[metaKey] = metaValue + } + } } - return nil } // getResourceDestructors builds a list of functions, each of them delete one of replica resources -func getResourceDestructors(construction, destruction *dependencyGraph, replicaMap map[string]client.Replica, failed *chan *ScheduledResource) []func() bool { - var destructors []func() bool +func getResourceDestructors(construction, destruction *dependencyGraph, replicaMap map[string]client.Replica, + failed *chan *scheduledResource) []func(<-chan struct{}) bool { + var destructors []func(<-chan struct{}) bool for _, depGraph := range [2]*dependencyGraph{construction, destruction} { for _, resource := range depGraph.graph { resourceCanBeDeleted := true @@ -768,8 +995,8 @@ func getResourceDestructors(construction, destruction *dependencyGraph, replicaM return destructors } -func getDestructorFunc(resource *ScheduledResource, failed *chan *ScheduledResource) func() bool { - return func() bool { +func getDestructorFunc(resource *scheduledResource, failed *chan *scheduledResource) func(<-chan struct{}) bool { + return func(<-chan struct{}) bool { res := deleteResource(resource) if res != nil { *failed <- resource @@ -780,10 +1007,12 @@ func getDestructorFunc(resource *ScheduledResource, failed *chan *ScheduledResou } // deleteReplicaResources invokes resources destructors and deletes replicas for which 100% of resources were deleted -func deleteReplicaResources(sched *scheduler, destructors []func() bool, replicaMap map[string]client.Replica, failed *chan *ScheduledResource) { - *failed = make(chan *ScheduledResource, len(destructors)) +func deleteReplicaResources(sched *scheduler, destructors []func(<-chan struct{}) bool, replicaMap map[string]client.Replica, + failed *chan *scheduledResource, stopChan <-chan struct{}) { + + *failed = make(chan *scheduledResource, len(destructors)) defer close(*failed) - deleted := runConcurrently(destructors, sched.concurrency) + deleted := runConcurrently(destructors, sched.concurrency, stopChan) failedReplicas := map[string]bool{} if !deleted { log.Println("Some of resources were not deleted") @@ -800,7 +1029,7 @@ readFailed: break readFailed } } - var deleteReplicaFuncs []func() bool + var deleteReplicaFuncs []func(<-chan struct{}) bool for replicaName, replicaObject := range replicaMap { if _, found := failedReplicas[replicaName]; found { @@ -808,7 +1037,7 @@ readFailed: } replicaNameCopy := replicaName replicaObjectCopy := replicaObject - deleteReplicaFuncs = append(deleteReplicaFuncs, func() bool { + deleteReplicaFuncs = append(deleteReplicaFuncs, func(<-chan struct{}) bool { log.Printf("%s flow: Deleting replica %s", replicaObjectCopy.FlowName, replicaNameCopy) err := sched.client.Replicas().Delete(replicaObjectCopy.Name) if err != nil { @@ -818,28 +1047,28 @@ readFailed: }) } - if deleteReplicaFuncs != nil && !runConcurrently(deleteReplicaFuncs, sched.concurrency) { + if deleteReplicaFuncs != nil && !runConcurrently(deleteReplicaFuncs, sched.concurrency, stopChan) { log.Println("Some of flow replicas were not deleted") } } -func (sched *scheduler) composeDeletingFinalizer(construction, destruction *dependencyGraph, replicas []client.Replica) func() { +func (sched *scheduler) composeDeletingFinalizer(construction, destruction *dependencyGraph, replicas []client.Replica) func(<-chan struct{}) { replicaMap := map[string]client.Replica{} for _, replica := range replicas { replicaMap[replica.ReplicaName()] = replica } - var failed chan *ScheduledResource + var failed chan *scheduledResource destructors := getResourceDestructors(construction, destruction, replicaMap, &failed) - return func() { + return func(stopChan <-chan struct{}) { log.Print("Performing resource cleanup") - deleteReplicaResources(sched, destructors, replicaMap, &failed) + deleteReplicaResources(sched, destructors, replicaMap, &failed, stopChan) } } -func makeAcknowledgeReplicaFunc(replica client.Replica, api client.ReplicasInterface) func() bool { - return func() bool { +func makeAcknowledgeReplicaFunc(replica client.Replica, api client.ReplicasInterface) func(<-chan struct{}) bool { + return func(<-chan struct{}) bool { replica.Deployed = true log.Printf("%s flow: Marking replica %s as deployed", replica.FlowName, replica.ReplicaName()) if err := api.Update(&replica); err != nil { @@ -850,16 +1079,16 @@ func makeAcknowledgeReplicaFunc(replica client.Replica, api client.ReplicasInter } } -func (sched *scheduler) composeAcknowledgingFinalizer(replicas []client.Replica) func() { - var funcs []func() bool +func (sched *scheduler) composeAcknowledgingFinalizer(replicas []client.Replica) func(<-chan struct{}) { + var funcs []func(<-chan struct{}) bool for _, replica := range replicas { if !replica.Deployed { funcs = append(funcs, makeAcknowledgeReplicaFunc(replica, sched.client.Replicas())) } } - return func() { - if !runConcurrently(funcs, sched.concurrency) { + return func(stopChan <-chan struct{}) { + if !runConcurrently(funcs, sched.concurrency, stopChan) { log.Println("Some of the replicas were not updated!") } } diff --git a/pkg/scheduler/dependency_graph_test.go b/pkg/scheduler/dependency_graph_test.go index 9cc7068..07ae319 100644 --- a/pkg/scheduler/dependency_graph_test.go +++ b/pkg/scheduler/dependency_graph_test.go @@ -15,6 +15,7 @@ package scheduler import ( + "strings" "testing" "github.com/Mirantis/k8s-AppController/pkg/client" @@ -319,3 +320,76 @@ func TestDependencyToFlowMatching(t *testing.T) { } } } + +// TestPermute tests permute function +func TestPermute(t *testing.T) { + alphabets := [][]string{ + {"1", "2", "3"}, + {"+", "-"}, + {"a", "b"}, + {"="}, + } + + expected := map[string]bool{ + "1+a=": true, + "1+b=": true, + "1-a=": true, + "1-b=": true, + "2+a=": true, + "2+b=": true, + "2-a=": true, + "2-b=": true, + "3+a=": true, + "3+b=": true, + "3-a=": true, + "3-b=": true, + } + permutations := permute(alphabets) + for _, combination := range permutations { + combinationStr := strings.Join(combination, "") + if !expected[combinationStr] { + t.Errorf("unexpected combination %s", combinationStr) + } else { + delete(expected, combinationStr) + } + } + if len(expected) != 0 { + t.Error("not all combinations were generated") + } + + alphabets = append(alphabets, make([]string, 0)) + if len(permute(alphabets)) != 0 { + t.Error("empty alphabet didin't result in empty permutation list") + } +} + +// TestExpendListExpression tests list expression translation to list of strings +func TestExpendListExpression(t *testing.T) { + table := map[string][]string{ + "1": {"1"}, + "1..5": {"1", "2", "3", "4", "5"}, + "2..-1": {}, + "a, b": {"a", "b"}, + "a, b, 2..4": {"a", "b", "2", "3", "4"}, + "-1..1, 2..4, x": {"-1", "0", "1", "2", "3", "4", "x"}, + "a..b": {"a..b"}, + "..": {".."}, + "1...3": {"1...3"}, + "1..b": {"1..b"}, + "a..b, 1..3": {"a..b", "1", "2", "3"}, + "a..b, c..d": {"a..b", "c..d"}, + "": {}, + } + for expr, expected := range table { + result := expandListExpression(expr) + if len(result) != len(expected) { + t.Errorf("unexpected result length for expression %s: %d != %d", expr, len(result), len(expected)) + } else { + for i := range expected { + if expected[i] != result[i] { + t.Errorf("invalid entry %d for expression %s: %s != %s", i, expr, expected[i], result[i]) + } + } + } + } +} diff --git a/pkg/scheduler/flows_test.go b/pkg/scheduler/flows_test.go index 464865c..5713179 100644 --- a/pkg/scheduler/flows_test.go +++ b/pkg/scheduler/flows_test.go @@ -144,8 +144,7 @@ func TestTriggerFlowIndependently(t *testing.T) { t.Fatal("Job list is not empty") } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err = c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -217,8 +216,7 @@ func TestTriggerOneFlowFromAnother(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -262,8 +260,7 @@ func TestParameterPassing(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -306,8 +303,7 @@ func TestMultipathParameterPassing(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -354,8 +350,7 @@ func TestParametrizedFlow(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -396,8 +391,7 @@ func TestAcNameParameter(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) replicas, err := c.Replicas().List(api.ListOptions{}) if err != nil { @@ -444,8 +438,7 @@ func TestUseUndeclaredFlowParameter(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -538,8 +531,7 @@ func TestParameterPassingBetweenFlows(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -596,8 +588,7 @@ func TestReplication(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, 2*replicaCount, replicaCount) @@ -625,8 +616,7 @@ func TestReplicationWithSharedResources(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, replicaCount+1, replicaCount) @@ -664,8 +654,7 @@ func TestReplicationScaleUp(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, initialReplicaCount, initialReplicaCount) @@ -679,7 +668,7 @@ func TestReplicationScaleUp(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, adjustedReplicaCount, adjustedReplicaCount) @@ -693,7 +682,7 @@ func TestReplicationScaleUp(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, adjustedReplicaCount+replicaCountDelta, adjustedReplicaCount+replicaCountDelta) @@ -718,8 +707,7 @@ func TestNoOpReplication(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) } @@ -749,8 +737,7 @@ func TestCompositionFlowReplication(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, 4*replicaCount, 2*replicaCount) @@ -802,8 +789,7 @@ func TestDestruction(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 6, 5) @@ -812,7 +798,7 @@ func TestDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 4, 3) @@ -821,7 +807,7 @@ func TestDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, 2, 1) if jobs[0].Name != "ready-b" && jobs[1].Name != "ready-b" { @@ -833,7 +819,7 @@ func TestDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) } @@ -859,8 +845,7 @@ func TestCompositeFlowDestruction(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 15, 10) @@ -869,7 +854,7 @@ func TestCompositeFlowDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 9, 6) @@ -878,7 +863,7 @@ func TestCompositeFlowDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 3, 2) @@ -887,7 +872,7 @@ func TestCompositeFlowDestruction(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) } @@ -944,8 +929,7 @@ func TestCleanupResources(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 1) if !(aCreated && !aDeleted && !bCreated && !bDeleted && !cCreated && !cDeleted) { @@ -958,7 +942,7 @@ func TestCleanupResources(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) if !(aCreated && aDeleted && bCreated && bDeleted && cCreated && cDeleted) { @@ -991,8 +975,7 @@ func TestSharedReplicaSpace(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 2, 1) @@ -1002,7 +985,7 @@ func TestSharedReplicaSpace(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) @@ -1012,7 +995,7 @@ func TestSharedReplicaSpace(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 2, 1) @@ -1022,7 +1005,7 @@ func TestSharedReplicaSpace(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0) } @@ -1043,8 +1026,7 @@ func TestDeleteExistingResources(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 1) @@ -1053,7 +1035,7 @@ func TestDeleteExistingResources(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 0, "invalid attempt to delete external resources") @@ -1063,7 +1045,7 @@ func TestDeleteExistingResources(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 1) @@ -1072,7 +1054,7 @@ func TestDeleteExistingResources(t *testing.T) { if err != nil { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 0, 0, "valid attempt to delete external resources") } @@ -1103,8 +1085,7 @@ func TestCleanupResourcesErrorHandling(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 2, 2) @@ -1114,7 +1095,7 @@ func TestCleanupResourcesErrorHandling(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 1) } @@ -1137,7 +1118,7 @@ func TestDeploymentRecoveryForRelativeReplicaCount(t *testing.T) { func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { if !prevented { prevented = true - stopChan <- struct{}{} + close(stopChan) return true, nil, errors.New("resource cannot be created") } return false, nil, nil @@ -1166,7 +1147,7 @@ func TestDeploymentRecoveryForRelativeReplicaCount(t *testing.T) { t.Fatal(err) } - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 1, 2) replicas, _ = c.Replicas().List(api.ListOptions{}) for _, r := range replicas.Items { @@ -1194,9 +1175,7 @@ func TestMultiParentFlow(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) - + depGraph.Deploy(nil) ensureReplicas(c, t, 5, 3) } @@ -1219,8 +1198,7 @@ func TestMultiParentFlowWithSuffix(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) ensureReplicas(c, t, 6, 4) } @@ -1254,8 +1232,7 @@ func TestMultipleFlowCalls(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs := ensureReplicas(c, t, 4, 3) jobNames := map[string]bool{ @@ -1304,8 +1281,7 @@ func TestMultipathParameterPassingWithSuffix(t *testing.T) { t.Fatal(err) } - stopChan := make(chan struct{}) - depGraph.Deploy(stopChan) + depGraph.Deploy(nil) jobs, err := c.Jobs().List(api_v1.ListOptions{}) if err != nil { @@ -1367,3 +1343,178 @@ func TestSyncOnVoidResource(t *testing.T) { depGraph.Deploy(stopChan) ensureReplicas(c, t, replicaCount, replicaCount) } + +// TestConsumeReplicatedFlow tests case, where each replica of the outer flow consumes N replicas of another flow +// by replicating dependency which leads to the consumed flow +func TestConsumeReplicatedFlow(t *testing.T) { + dep := mocks.MakeDependency("flow/outer", "flow/inner/$AC_NAME-$i", "flow=outer") + dep.GenerateFor = map[string]string{"i": "1..3"} + + c := mocks.NewClient( + mocks.MakeFlow("inner"), + mocks.MakeFlow("outer"), + mocks.MakeResourceDefinition("job/ready-$AC_NAME"), + dep, + mocks.MakeDependency("flow/inner", "job/ready-$AC_NAME", "flow=inner"), + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 2, FlowName: "outer"}) + if err != nil { + t.Fatal(err) + } + depGraph.Deploy(nil) + + ensureReplicas(c, t, 2*3, 3*2+2) +} + +// TestComplexDependencyReplication tests complex dependency generation over two list expressions +func TestComplexDependencyReplication(t *testing.T) { + dep := mocks.MakeDependency("flow/test", "job/ready-$x-$y", "flow=test") + dep.GenerateFor = map[string]string{ + "x": "1..3, 8..9", + "y": "a, b", + } + + c := mocks.NewClient( + mocks.MakeFlow("test"), + mocks.MakeResourceDefinition("job/ready-$x-$y"), + dep, + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 1, FlowName: "test"}) + if err != nil { + t.Fatal(err) + } + depGraph.Deploy(nil) + + expectedJobNames := map[string]bool{ + "ready-1-a": true, + "ready-2-a": true, + "ready-3-a": true, + "ready-8-a": true, + "ready-9-a": true, + "ready-1-b": true, + "ready-2-b": true, + "ready-3-b": true, + "ready-8-b": true, + "ready-9-b": true, + } + jobs := ensureReplicas(c, t, len(expectedJobNames), 1) + for _, j := range jobs { + if !expectedJobNames[j.Name] { + t.Errorf("unexpected job %s", j.Name) + } else { + delete(expectedJobNames, j.Name) + } + } + if len(expectedJobNames) != 0 { + t.Error("not all jobs were found") + } +} + +// TestDynamicDependencyReplication tests that variables can be used in list expressions used for dependency replication +func TestDynamicDependencyReplication(t *testing.T) { + flow := mocks.MakeFlow("test") + flow.Flow.Parameters = map[string]client.FlowParameter{ + "replicaCount": mocks.MakeFlowParameter("1"), + } + + dep := mocks.MakeDependency("flow/test", "job/ready-$index", "flow=test") + dep.GenerateFor = map[string]string{ + "index": "1..$replicaCount", + } + + c := mocks.NewClient( + flow, + mocks.MakeResourceDefinition("job/ready-$index"), + dep, + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 1, FlowName: "test", + Args: map[string]string{"replicaCount": "7"}}) + if err != nil { + t.Fatal(err) + } + depGraph.Deploy(nil) + + ensureReplicas(c, t, 7, 1) +} + +// TestSequentialReplication tests that resources of sequentially replicated flows create in right order +func TestSequentialReplication(t *testing.T) { + replicaCount := 3 + flow := mocks.MakeFlow("test") + flow.Flow.Sequential = true + + c, fake := mocks.NewClientWithFake( + flow, + mocks.MakeResourceDefinition("pod/ready-$AC_NAME"), + mocks.MakeResourceDefinition("secret/secret"), + mocks.MakeResourceDefinition("job/ready-$AC_NAME"), + mocks.MakeDependency("flow/test", "pod/ready-$AC_NAME", "flow=test"), + mocks.MakeDependency("pod/ready-$AC_NAME", "secret/secret", "flow=test"), + mocks.MakeDependency("secret/secret", "job/ready-$AC_NAME", "flow=test"), + ) + + var deployed []string + fake.PrependReactor("create", "*", + func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + resource := action.GetResource().Resource + if resource != "replica" { + deployed = append(deployed, resource) + } + + return false, nil, nil + }) + + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "test"}) + if err != nil { + t.Fatal(err) + } + + graph := depGraph.(*dependencyGraph).graph + if len(graph) != 3*replicaCount { + t.Error("wrong dependency graph length") + } + + depGraph.Deploy(nil) + expected := []string{"pods", "secrets", "jobs", "pods", "jobs", "pods", "jobs"} + if len(deployed) != len(expected) { + t.Fatal("invalid resource sequence", deployed) + } + for i, r := range deployed { + if expected[i] != r { + t.Fatal("invalid resource sequence") + } + } + + ensureReplicas(c, t, replicaCount, replicaCount) +} + +// TestSequentialReplicationWithSharedFlow tests that flow consumed as a resource shared by replicas of +// sequentially replicated flow deployed only once +func TestSequentialReplicationWithSharedFlow(t *testing.T) { + replicaCount := 3 + flow := mocks.MakeFlow("outer") + flow.Flow.Sequential = true + + c := mocks.NewClient( + flow, + mocks.MakeFlow("inner"), + mocks.MakeResourceDefinition("job/ready-a$AC_NAME"), + mocks.MakeResourceDefinition("job/ready-b$AC_NAME"), + mocks.MakeDependency("flow/outer", "flow/inner", "flow=outer"), + mocks.MakeDependency("flow/inner", "job/ready-a$AC_NAME", "flow=outer"), + mocks.MakeDependency("flow/inner", "job/ready-b$AC_NAME", "flow=inner"), + ) + + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "outer"}) + if err != nil { + t.Fatal(err) + } + + depGraph.Deploy(nil) + ensureReplicas(c, t, replicaCount+1, replicaCount+1) +} diff --git a/pkg/scheduler/frontend.go b/pkg/scheduler/frontend.go index dffcd92..45174a4 100644 --- a/pkg/scheduler/frontend.go +++ b/pkg/scheduler/frontend.go @@ -30,6 +30,10 @@ type scheduler struct { client client.Interface selector labels.Selector concurrency int + + resDefsCache map[string]client.ResourceDefinition + dependencyCache []client.Dependency + graphHasNoCycles bool } var _ interfaces.Scheduler = &scheduler{} @@ -161,14 +165,11 @@ func Deploy(sched interfaces.Scheduler, options interfaces.DependencyGraphOption if err != nil { return "", err } - var ch <-chan struct{} - if stopChan == nil { - ch := make(chan struct{}) - defer close(ch) + if depGraph.Deploy(stopChan) { + log.Println("Deployment finished sucessfully") } else { - ch = stopChan + log.Println("Deployment failed") } - depGraph.Deploy(ch) } else { log.Printf("Scheduling deployment of %s flow", options.FlowName) var err error @@ -178,7 +179,6 @@ func Deploy(sched interfaces.Scheduler, options interfaces.DependencyGraphOption } log.Printf("Scheduled deployment task %s", task) } - log.Println("Done") return task, nil } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 43e0031..73ad75f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -17,6 +17,7 @@ package scheduler import ( "fmt" "log" + "strings" "sync" "time" @@ -34,26 +35,27 @@ const ( WaitTimeout = time.Second * 600 ) -// ScheduledResource is a wrapper for Resource with attached relationship data -type ScheduledResource struct { - Requires []string - RequiredBy []string - Started bool - Ignored bool - Error error - Existing bool +// scheduledResource is a wrapper for Resource with attached relationship data +type scheduledResource struct { + requires []string + requiredBy []string + started bool + ignored bool + skipped bool + error error + existing bool context *graphContext usedInReplicas []string status interfaces.ResourceStatus suffix string interfaces.Resource // parentKey -> dependencyMetadata - Meta map[string]map[string]string + meta map[string]map[string]string sync.RWMutex } // Key returns resource identifier with optional suffix -func (sr *ScheduledResource) Key() string { +func (sr *scheduledResource) Key() string { baseKey := sr.Resource.Key() if sr.suffix == "" { return baseKey @@ -61,76 +63,66 @@ func (sr *ScheduledResource) Key() string { return baseKey + "/" + sr.suffix } -// RequestCreation does not create a scheduled resource immediately, but updates status +// requestCreation does not create a scheduled resource immediately, but updates status // and puts the scheduled resource to corresponding channel. Returns true if // scheduled resource creation was actually requested, false otherwise. -func (sr *ScheduledResource) RequestCreation(toCreate chan *ScheduledResource) bool { +func (sr *scheduledResource) requestCreation(toCreate chan<- *scheduledResource) bool { sr.RLock() // somebody already requested resource creation - if sr.Started { + if sr.started { sr.RUnlock() return true } sr.RUnlock() + isBlocked := sr.isBlocked() && sr.error == nil sr.Lock() defer sr.Unlock() - if !sr.Started && !sr.IsBlocked() { - sr.Started = true + if !sr.started && !isBlocked { + sr.started = true toCreate <- sr return true } return false } -func isResourceFinished(sr *ScheduledResource, ch chan error) bool { +func isResourceFinished(sr *scheduledResource) (bool, error) { status, err := sr.Status(nil) if err != nil { - ch <- err - return true + return status != interfaces.ResourceNotReady, err } if status == interfaces.ResourceReady { - ch <- nil - return true + return true, nil } - return false + return false, nil } -// Wait periodically checks resource status and returns if the resource processing is finished, +// wait periodically checks resource status and returns if the resource processing is finished, // regardless successful or not. The actual result of processing could be obtained from returned error. -func (sr *ScheduledResource) Wait(checkInterval time.Duration, timeout time.Duration, stopChan <-chan struct{}) (bool, error) { - ch := make(chan error, 1) - go func(ch chan error) { - log.Printf("%s flow: waiting for %v to be created", sr.context.graph.graphOptions.FlowName, sr.Key()) - if isResourceFinished(sr, ch) { - return - } - ticker := time.NewTicker(checkInterval) - for { - select { - case <-stopChan: - return - case <-ticker.C: - if isResourceFinished(sr, ch) { - return - } +func (sr *scheduledResource) wait(checkInterval time.Duration, timeout time.Duration, stopChan <-chan struct{}) (bool, error) { + log.Printf("%s flow: waiting for %v to be created", sr.context.graph.graphOptions.FlowName, sr.Key()) + var err error + var finished bool + if finished, err = isResourceFinished(sr); finished { + return false, err + } + ticker := time.NewTicker(checkInterval) + timeoutChan := time.After(timeout) + for { + select { + case <-stopChan: + return true, nil + case <-ticker.C: + if finished, err = isResourceFinished(sr); finished { + return false, err + } + case <-timeoutChan: + if err == nil { + err = fmt.Errorf("%s flow: timeout waiting for resource %s", sr.context.graph.graphOptions.FlowName, sr.Key()) } + return false, err } - - }(ch) - - select { - case <-stopChan: - return true, nil - case err := <-ch: - return false, err - case <-time.After(timeout): - e := fmt.Errorf("%s flow: timeout waiting for resource %s", sr.context.graph.graphOptions.FlowName, sr.Key()) - sr.Lock() - defer sr.Unlock() - sr.Error = e - return false, e } } @@ -139,11 +131,11 @@ func (sr *ScheduledResource) Wait(checkInterval time.Duration, timeout time.Dura // so that if resource becomes ready it stays in this status for the whole deployment duration. // Errors returned by the resource are never cached, however if AC sees permanent problem with resource it may set the // error field -func (sr *ScheduledResource) Status(meta map[string]string) (interfaces.ResourceStatus, error) { +func (sr *scheduledResource) Status(meta map[string]string) (interfaces.ResourceStatus, error) { sr.Lock() defer sr.Unlock() - if sr.status != "" || sr.Error != nil { - return sr.status, sr.Error + if sr.status != "" || sr.error != nil { + return sr.status, sr.error } status, err := sr.Resource.Status(meta) if err == nil && status == interfaces.ResourceReady { @@ -152,50 +144,64 @@ func (sr *ScheduledResource) Status(meta map[string]string) (interfaces.Resource return status, err } -// IsBlocked checks whether a scheduled resource can be created. It checks status of resources -// it depends on, via API -func (sr *ScheduledResource) IsBlocked() bool { - for _, reqKey := range sr.Requires { - meta := sr.Meta[reqKey] - _, onErrorSet := meta["on-error"] - req := sr.context.graph.graph[reqKey] +// isBlocked checks whether a scheduled resource can be created. It checks status of resources +// it depends on, via API. +func (sr *scheduledResource) isBlocked() bool { + isBlocked := false + var permanentlyBlockedKeys []string + skipped := true - status, err := req.Status(meta) + for _, reqKey := range sr.requires { + meta := sr.meta[reqKey] + onErrorValue, onErrorSet := meta["on-error"] + onErrorSet = onErrorSet && onErrorValue != "false" + req := sr.context.graph.graph[reqKey] req.RLock() - ignored := req.Ignored + ignored := req.ignored + permanentError := req.error req.RUnlock() - if err != nil && !onErrorSet && !ignored { - return true - } else if status == "ready" && onErrorSet { - return true - } else if err == nil && status != "ready" { - return true + if ignored { + continue + } + status, err := req.Status(meta) + if onErrorSet { + if permanentError == nil { + isBlocked = true + if err == nil && status == interfaces.ResourceReady { + permanentlyBlockedKeys = append(permanentlyBlockedKeys, reqKey) + } + } + } else { + skipped = false + if permanentError != nil || err != nil || status != interfaces.ResourceReady { + isBlocked = true + if permanentError != nil { + permanentlyBlockedKeys = append(permanentlyBlockedKeys, reqKey) + } + } } } - return false -} + if len(permanentlyBlockedKeys) > 0 { + sr.Lock() + sr.error = fmt.Errorf("permanently blocked because of (%s) dependencies", strings.Join(permanentlyBlockedKeys, ",")) + sr.skipped = skipped + sr.Unlock() + } -// ResetStatus resets cached status of scheduled resource -func (sr *ScheduledResource) ResetStatus() { - sr.Lock() - defer sr.Unlock() - sr.Error = nil - sr.status = "" + return isBlocked } -func createResources(toCreate chan *ScheduledResource, finished chan string, ccLimiter chan struct{}, stopChan <-chan struct{}) { +func createResources(toCreate chan *scheduledResource, finished chan<- *scheduledResource, ccLimiter chan struct{}, stopChan <-chan struct{}) { for r := range toCreate { - log.Printf("Requesting creation of %v", r.Key()) select { case <-stopChan: log.Println("Terminating creation of resources") return default: - log.Println("Deployment is not stopped, keep creating") } - go func(r *ScheduledResource, finished chan string, ccLimiter chan struct{}) { + go func(r *scheduledResource, finished chan<- *scheduledResource, ccLimiter chan struct{}) { // Acquire semaphore ccLimiter <- struct{}{} defer func() { @@ -212,18 +218,18 @@ func createResources(toCreate chan *ScheduledResource, finished chan string, ccL } for attemptNo := 1; attemptNo <= attempts; attemptNo++ { - - r.ResetStatus() - var err error // NOTE(gluke77): We start goroutines for dependencies // before the resource becomes ready, since dependencies // could have metadata defining their own readiness condition if attemptNo == 1 { - for _, reqKey := range r.RequiredBy { + for _, reqKey := range r.requiredBy { req := r.context.graph.graph[reqKey] - go func(req *ScheduledResource, toCreate chan *ScheduledResource) { + go func(req *scheduledResource, toCreate chan<- *scheduledResource) { + if req.requestCreation(toCreate) { + return + } ticker := time.NewTicker(CheckInterval) for { select { @@ -231,7 +237,7 @@ func createResources(toCreate chan *ScheduledResource, finished chan string, ccL log.Println("Terminating creation of dependencies") return case <-ticker.C: - if req.RequestCreation(toCreate) { + if req.requestCreation(toCreate) { return } } @@ -240,7 +246,7 @@ func createResources(toCreate chan *ScheduledResource, finished chan string, ccL } } - if attemptNo > 1 && (!r.Existing || r.context.graph.graphOptions.AllowDeleteExternalResources) { + if attemptNo > 1 && (!r.existing || r.context.graph.graphOptions.AllowDeleteExternalResources) { log.Printf("Trying to delete resource %s after previous unsuccessful attempt", r.Key()) err = r.Delete() if err != nil { @@ -249,11 +255,12 @@ func createResources(toCreate chan *ScheduledResource, finished chan string, ccL } r.RLock() - ignored := r.Ignored + ignored := r.ignored + srErr := r.error r.RUnlock() - if ignored { - log.Printf("Skipping creation of resource %s as being ignored", r.Key()) + if ignored || srErr != nil { + log.Printf("Skipping creation of resource %s", r.Key()) break } @@ -261,49 +268,53 @@ func createResources(toCreate chan *ScheduledResource, finished chan string, ccL err = r.Create() if err != nil { log.Printf("Error deploying resource %s: %v", r.Key(), err) - continue - } + } else { + log.Printf("Checking status for %s", r.Key()) - log.Printf("Checking status for %s", r.Key()) + var stopped bool + stopped, err = r.wait(CheckInterval, waitTimeout, stopChan) - stoped, err := r.Wait(CheckInterval, waitTimeout, stopChan) - - if stoped { - log.Printf("Received interrupt while waiting for %v. Exiting", r.Key()) - return - } + if stopped { + log.Printf("Received interrupt while waiting for %v. Exiting", r.Key()) + return + } - if err == nil { - log.Printf("Resource %s created", r.Key()) - break + if err == nil { + log.Printf("Resource %s created", r.Key()) + break + } + log.Printf("Resource %s was not created: %v", r.Key(), err) } - log.Printf("Resource %s was not created: %v", r.Key(), err) - - if attemptNo >= attempts { - if onError == "ignore" { + if attemptNo == attempts { + switch onError { + case "ignore": r.Lock() - r.Ignored = true + r.ignored = true log.Printf("Resource %s failure ignored -- prooceeding as normal", r.Key()) r.Unlock() - } else if onError == "ignore-all" { + case "ignore-all": ignoreAll(r) + default: + r.Lock() + r.error = err + r.Unlock() } } } - finished <- r.Key() + finished <- r }(r, finished, ccLimiter) } } -func ignoreAll(top *ScheduledResource) { +func ignoreAll(top *scheduledResource) { top.Lock() - top.Ignored = true + top.ignored = true top.Unlock() log.Printf("Marking resource %s as ignored", top.Key()) - for _, childKey := range top.RequiredBy { + for _, childKey := range top.requiredBy { child := top.context.graph.graph[childKey] ignoreAll(child) } @@ -315,8 +326,7 @@ func (depGraph dependencyGraph) Options() interfaces.DependencyGraphOptions { } // Deploy starts the deployment of a DependencyGraph -func (depGraph dependencyGraph) Deploy(stopChan <-chan struct{}) { - +func (depGraph dependencyGraph) Deploy(stopChan <-chan struct{}) bool { depCount := len(depGraph.graph) concurrency := depGraph.scheduler.concurrency @@ -326,8 +336,8 @@ func (depGraph dependencyGraph) Deploy(stopChan <-chan struct{}) { } ccLimiter := make(chan struct{}, concurrencyLimiterLen) - toCreate := make(chan *ScheduledResource, depCount) - created := make(chan string, depCount) + toCreate := make(chan *scheduledResource, depCount) + created := make(chan *scheduledResource, depCount) defer func() { close(toCreate) close(created) @@ -336,44 +346,48 @@ func (depGraph dependencyGraph) Deploy(stopChan <-chan struct{}) { go createResources(toCreate, created, ccLimiter, stopChan) for _, r := range depGraph.graph { - if len(r.Requires) == 0 { - r.RequestCreation(toCreate) + if len(r.requires) == 0 { + r.requestCreation(toCreate) } } log.Printf("%s flow: waiting for %d resource deployments", depGraph.graphOptions.FlowName, depCount) + result := true for i := 0; i < depCount; { select { case <-stopChan: log.Printf("Deployment of %s is stopped", depGraph.graphOptions.FlowName) - return - case <-created: + return false + case sr := <-created: + if sr.error != nil && !sr.skipped { + result = false + } i++ log.Printf("%s flow: %v out of %v were created", depGraph.graphOptions.FlowName, i, depCount) } } if depGraph.finalizer != nil { - depGraph.finalizer() + depGraph.finalizer(stopChan) } - // TODO Make sure every KO gets created eventually + return result } -// GetNodeReport acts as a more verbose version of IsBlocked. It performs the -// same check as IsBlocked, but returns the DeploymentReport -func (sr *ScheduledResource) GetNodeReport(name string) report.NodeReport { +// getNodeReport acts as a more verbose version of isBlocked. It performs the +// same check as isBlocked, but returns the DeploymentReport +func (sr *scheduledResource) getNodeReport(name string) report.NodeReport { var ready bool isBlocked := false - dependencies := make([]interfaces.DependencyReport, 0, len(sr.Requires)) + dependencies := make([]interfaces.DependencyReport, 0, len(sr.requires)) status, err := sr.Status(nil) if err != nil { ready = false } else { ready = status == "ready" } - for _, rKey := range sr.Requires { + for _, rKey := range sr.requires { r := sr.context.graph.graph[rKey] r.RLock() - meta := r.Meta[sr.Key()] + meta := r.meta[sr.Key()] depReport := r.GetDependencyReport(meta) r.RUnlock() if depReport.Blocks { @@ -397,7 +411,7 @@ func (depGraph dependencyGraph) GetStatus() (interfaces.DeploymentStatus, interf var status interfaces.DeploymentStatus deploymentReport := make(report.DeploymentReport, 0, len(depGraph.graph)) for key, resource := range depGraph.graph { - depReport := resource.GetNodeReport(key) + depReport := resource.getNodeReport(key) deploymentReport = append(deploymentReport, depReport) if depReport.Ready { readyExist = true @@ -418,7 +432,7 @@ func (depGraph dependencyGraph) GetStatus() (interfaces.DeploymentStatus, interf return status, deploymentReport } -func runConcurrently(funcs []func() bool, concurrency int) bool { +func runConcurrently(funcs []func(<-chan struct{}) bool, concurrency int, stopChan <-chan struct{}) bool { if concurrency < 1 { concurrency = len(funcs) } @@ -430,12 +444,18 @@ func runConcurrently(funcs []func() bool, concurrency int) bool { for _, f := range funcs { sem <- true - go func(foo func() bool) { - defer func() { <-sem }() - if !foo() { - result = false - } - }(f) + select { + case <-stopChan: + result = false + <-sem + default: + go func(foo func(<-chan struct{}) bool) { + defer func() { <-sem }() + if !foo(stopChan) { + result = false + } + }(f) + } } for i := 0; i < cap(sem); i++ { @@ -444,8 +464,8 @@ func runConcurrently(funcs []func() bool, concurrency int) bool { return result } -func deleteResource(resource *ScheduledResource) error { - if !resource.Existing || resource.context.graph.graphOptions.AllowDeleteExternalResources { +func deleteResource(resource *scheduledResource) error { + if !resource.existing || resource.context.graph.graphOptions.AllowDeleteExternalResources { log.Printf("%s flow: Deleting resource %s", resource.context.flow.Name, resource.Key()) err := resource.Delete() if err != nil { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 2176b3f..484b56e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -64,19 +64,19 @@ func TestBuildDependencyGraph(t *testing.T) { "pod/ready-1", sr.Key()) } - if len(sr.Requires) != 0 { + if len(sr.requires) != 0 { t.Errorf("wrong length of 'Requires' for scheduled resource '%s', expected %d, actual %d", - sr.Key(), 0, len(sr.Requires)) + sr.Key(), 0, len(sr.requires)) } - if len(sr.RequiredBy) != 1 { + if len(sr.requiredBy) != 1 { t.Errorf("wrong length of 'RequiredBy' for scheduled resource '%s', expected %d, actual %d", - sr.Key(), 1, len(sr.Requires)) + sr.Key(), 1, len(sr.requires)) } - if sr.RequiredBy[0] != "pod/ready-2" { + if sr.requiredBy[0] != "pod/ready-2" { t.Errorf("wrong value of 'RequiredBy' for scheduled resource '%s', expected '%s', actual '%s'", - sr.Key(), "pod/ready-2", sr.RequiredBy[0]) + sr.Key(), "pod/ready-2", sr.requiredBy[0]) return } @@ -92,19 +92,19 @@ func TestBuildDependencyGraph(t *testing.T) { "pod/ready-2", sr.Key()) } - if len(sr.Requires) != 1 { + if len(sr.requires) != 1 { t.Errorf("wrong length of 'Requires' for scheduled resource '%s', expected %d, actual %d", - sr.Key(), 1, len(sr.Requires)) + sr.Key(), 1, len(sr.requires)) } - if sr.Requires[0] != "pod/ready-1" { + if sr.requires[0] != "pod/ready-1" { t.Errorf("wrong value of 'Requires' for scheduled resource '%s', expected '%s', actual '%s'", - sr.Key(), "pod/ready-1", sr.Requires[0]) + sr.Key(), "pod/ready-1", sr.requires[0]) } - if len(sr.RequiredBy) != 0 { + if len(sr.requiredBy) != 0 { t.Errorf("wrong length of 'RequiredBy' for scheduled resource '%s', expected %d, actual %d", - sr.Key(), 0, len(sr.Requires)) + sr.Key(), 0, len(sr.requires)) } } @@ -112,47 +112,47 @@ func TestIsBlocked(t *testing.T) { depGraph := newDependencyGraph(nil, interfaces.DependencyGraphOptions{}) context := &graphContext{graph: depGraph} - one := &ScheduledResource{ + one := &scheduledResource{ Resource: report.SimpleReporter{BaseResource: mocks.NewResource("fake1", "not ready")}, - Meta: map[string]map[string]string{}, + meta: map[string]map[string]string{}, context: context, } depGraph.graph["fake1"] = one - if one.IsBlocked() { + if one.isBlocked() { t.Error("scheduled resource is blocked but it must not") } - two := &ScheduledResource{ + two := &scheduledResource{ Resource: report.SimpleReporter{BaseResource: mocks.NewResource("fake2", "ready")}, - Meta: map[string]map[string]string{}, + meta: map[string]map[string]string{}, context: context, } depGraph.graph["fake2"] = two - one.Requires = []string{"fake2"} + one.requires = []string{"fake2"} - if one.IsBlocked() { + if one.isBlocked() { t.Error("scheduled resource is blocked but it must not") } - two.Error = errors.New("non-nil error") - if !one.IsBlocked() { + two.error = errors.New("non-nil error") + if !one.isBlocked() { t.Error("scheduled resource is not blocked but it must be") } - depGraph.graph["fake3"] = &ScheduledResource{ + depGraph.graph["fake3"] = &scheduledResource{ Resource: report.SimpleReporter{mocks.NewResource("fake3", "not ready")}, - Meta: map[string]map[string]string{}, + meta: map[string]map[string]string{}, context: context, } - two.Error = nil - one.Requires = append(one.Requires, "fake3") + two.error = nil + one.requires = append(one.requires, "fake3") - if !one.IsBlocked() { + if !one.isBlocked() { t.Error("scheduled resource is not blocked but it must be") } } @@ -169,33 +169,33 @@ func TestIsBlockedWithOnErrorDependency(t *testing.T) { depGraph := newDependencyGraph(nil, interfaces.DependencyGraphOptions{}) context := &graphContext{graph: depGraph} - one := &ScheduledResource{ + one := &scheduledResource{ Resource: report.SimpleReporter{BaseResource: mocks.NewResource("fake1", "not ready")}, - Meta: map[string]map[string]string{}, + meta: map[string]map[string]string{}, context: context, } depGraph.graph["fake1"] = one - if one.IsBlocked() { + if one.isBlocked() { t.Error("scheduled resource is blocked but it must be not") } - two := &ScheduledResource{ + two := &scheduledResource{ Resource: report.SimpleReporter{BaseResource: mocks.NewResource("fake2", "not ready")}, - Meta: map[string]map[string]string{}, + meta: map[string]map[string]string{}, context: context, } depGraph.graph["fake2"] = two - one.Requires = []string{"fake2"} - one.Meta["fake2"] = map[string]string{"on-error": "true"} + one.requires = []string{"fake2"} + one.meta["fake2"] = map[string]string{"on-error": "true"} - if !one.IsBlocked() { + if !one.isBlocked() { t.Error("scheduled resource is not blocked but it must be") } - two.Error = errors.New("non-nil error") - if one.IsBlocked() { + two.error = errors.New("non-nil error") + if one.isBlocked() { t.Error("scheduled resource is blocked but it must be not") } } @@ -349,7 +349,7 @@ func TestLimitConcurrency(t *testing.T) { func TestStopBeforeDeploymentStarted(t *testing.T) { depGraph := newDependencyGraph(&scheduler{}, interfaces.DependencyGraphOptions{}) - sr := &ScheduledResource{ + sr := &scheduledResource{ Resource: report.SimpleReporter{BaseResource: mocks.NewResource("fake1", "not ready")}, } depGraph.graph[sr.Key()] = sr @@ -546,18 +546,18 @@ func TestGraph(t *testing.T) { } } -func makeTaskFunc(i int32, res bool, acc *int32) func() bool { - return func() bool { +func makeTaskFunc(i int32, res bool, acc *int32) func(<-chan struct{}) bool { + return func(<-chan struct{}) bool { time.Sleep(time.Second / 10) atomic.AddInt32(acc, i) return res } } -func runTaskFuncs(t *testing.T, funcs []func() bool, concurrency int, expectedSum int32, expectedResult bool, threshold float64, acc *int32) { +func runTaskFuncs(t *testing.T, funcs []func(<-chan struct{}) bool, concurrency int, expectedSum int32, expectedResult bool, threshold float64, acc *int32) { *acc = 0 before := time.Now() - res := runConcurrently(funcs, concurrency) + res := runConcurrently(funcs, concurrency, nil) after := time.Now() if *acc != expectedSum { t.Errorf("runConcurrently failed: %d != %d", expectedSum, acc) @@ -576,7 +576,7 @@ func runTaskFuncs(t *testing.T, funcs []func() bool, concurrency int, expectedSu // TestRunConcurrently tests runConcurrently function which runs list of concurrent tasks func TestRunConcurrently(t *testing.T) { var acc int32 - var funcs []func() bool + var funcs []func(<-chan struct{}) bool for i := int32(1); i <= 50; i++ { funcs = append(funcs, makeTaskFunc(i, true, &acc)) } @@ -604,9 +604,9 @@ func TestWaitWithZeroTimeout(t *testing.T) { defer close(stopChan) now := time.Now() - res, err := sr.Wait(CheckInterval, 0, stopChan) + res, err := sr.wait(CheckInterval, 0, stopChan) if res { - t.Error("Wait() succeded") + t.Error("wait() succeded") } if err == nil { t.Error("No error was returned") @@ -615,11 +615,8 @@ func TestWaitWithZeroTimeout(t *testing.T) { if err.Error() != expectedMessage { t.Error("Got unexpected error:", err) } - if sr.Error != err { - t.Error("ScheduledResource was not marked as permanently failed") - } } if time.Now().Sub(now) >= time.Second { - t.Error("Wait() was running for too long") + t.Error("wait() was running for too long") } }