Skip to content

Commit c947670

Browse files
committed
adding parallelism
1 parent cee186b commit c947670

3 files changed

Lines changed: 145 additions & 39 deletions

File tree

kubelink/config/GlobalConfig.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ import "github.com/caarlos0/env"
2222
type HelmReleaseConfig struct {
2323
EnableHelmReleaseCache bool `env:"ENABLE_HELM_RELEASE_CACHE" envDefault:"true" description:"Enable helm releases list cache" deprecated:"false" example:"true"`
2424
MaxCountForHelmRelease int `env:"MAX_COUNT_FOR_HELM_RELEASE" envDefault:"20" description:"Max count for helm release history list" deprecated:"false" example:"20"`
25-
ManifestFetchBatchSize int `env:"MANIFEST_FETCH_BATCH_SIZE" envDefault:"2" description:"Manifest fetch parallelism batch size (applied only for parent objects)" deprecated:"false" example:"2"`
25+
ManifestFetchBatchSize int `env:"MANIFEST_FETCH_BATCH_SIZE" envDefault:"10" description:"Manifest fetch parallelism batch size (applied for parent objects)" deprecated:"false" example:"10"`
2626
RunHelmInstallInAsyncMode bool `env:"RUN_HELM_INSTALL_IN_ASYNC_MODE" envDefault:"false" description:"Run helm install/ upgrade in async mode" deprecated:"false" example:"false"`
2727
ChartWorkingDirectory string `env:"CHART_WORKING_DIRECTORY" envDefault:"/home/devtron/devtroncd/charts/" description:"Helm charts working directory" deprecated:"false" example:"/home/devtron/devtroncd/charts/"`
28-
BuildNodesBatchSize int `env:"BUILD_NODES_BATCH_SIZE" envDefault:"2" description:"Resource tree build nodes parallelism batch size (applied only for depth-1 child objects of a parent object)" deprecated:"false" example:"2"`
28+
BuildNodesBatchSize int `env:"BUILD_NODES_BATCH_SIZE" envDefault:"10" description:"Resource tree build nodes parallelism batch size; controls depth-1 worker pool size and the shared semaphore size for all deeper levels" deprecated:"false" example:"10"`
2929
FeatChildChildObjectListingPaginationEnable bool `env:"FEAT_CHILD_OBJECT_LISTING_PAGINATION" envDefault:"true" description:"use pagination in listing all the dependent child objects. use 'CHILD_OBJECT_LISTING_PAGE_SIZE' to set the page size." deprecated:"false" example:"true"`
3030
}
3131

kubelink/pkg/service/commonHelmService/bean.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type BuildNodesRequest struct {
4848
RestConfig *rest.Config
4949
ReleaseNamespace string
5050
ParentResourceRef *commonBean.ResourceRef
51+
concurrencySem chan struct{} // shared semaphore for bounded all-depth parallelism in child node building
5152
}
5253

5354
func NewBuildNodesRequest(buildNodesConfig *BuildNodesRequest) *BuildNodesConfig {
@@ -117,6 +118,11 @@ func (req *BuildNodesRequest) WithParentResourceRef(parentResourceRef *commonBea
117118
return req
118119
}
119120

121+
func (req *BuildNodesRequest) WithSemaphore(sem chan struct{}) *BuildNodesRequest {
122+
req.concurrencySem = sem
123+
return req
124+
}
125+
120126
type BuildNodeResponse struct {
121127
Nodes []*commonBean.ResourceNode
122128
HealthStatusArray []*commonBean.HealthStatus

kubelink/pkg/service/commonHelmService/resourceTreeService.go

Lines changed: 137 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"k8s.io/apimachinery/pkg/runtime/schema"
2121
"k8s.io/client-go/rest"
2222
"net/http"
23+
"sync"
2324
)
2425

2526
type ResourceTreeServiceImpl struct {
@@ -74,9 +75,18 @@ func sanitizeParentObjects(parentObjects []*client.ObjectIdentifier) []*client.O
7475
func (impl *ResourceTreeServiceImpl) BuildResourceTreeUsingK8s(ctx context.Context, appDetailRequest *client.AppDetailRequest, conf *rest.Config, parentObjects []*client.ObjectIdentifier) (*bean.ResourceTreeResponse, error) {
7576
liveManifests := impl.getLiveManifestsForGVKList(conf, parentObjects)
7677

78+
// shared semaphore limits total concurrent K8s calls for child node building across all recursion depths;
79+
// sized to BuildNodesBatchSize so the combined in-flight calls (parent batch + child sem) stay within safe API server limits
80+
semSize := impl.helmReleaseConfig.BuildNodesBatchSize
81+
if semSize <= 0 {
82+
semSize = 1
83+
}
84+
sem := make(chan struct{}, semSize)
85+
7786
// build resource Nodes
7887
req := NewBuildNodesRequest(NewBuildNodesConfig(conf).
79-
WithReleaseNamespace(appDetailRequest.Namespace)).
88+
WithReleaseNamespace(appDetailRequest.Namespace).
89+
WithSemaphore(sem)).
8090
WithDesiredOrLiveManifests(liveManifests...).
8191
WithBatchWorker(impl.helmReleaseConfig.BuildNodesBatchSize, impl.logger)
8292
buildNodesResponse, err := impl.BuildNodes(req)
@@ -106,36 +116,60 @@ func (impl *ResourceTreeServiceImpl) BuildResourceTreeUsingK8s(ctx context.Conte
106116
}
107117

108118
func (impl *ResourceTreeServiceImpl) getLiveManifestsForGVKList(restConfig *rest.Config, gvkList []*client.ObjectIdentifier) []*bean.DesiredOrLiveManifest {
109-
var manifests []*bean.DesiredOrLiveManifest
110-
for _, resource := range gvkList {
111-
gvk := &schema.GroupVersionKind{
112-
Group: resource.GetGroup(),
113-
Version: resource.GetVersion(),
114-
Kind: resource.GetKind(),
119+
total := len(gvkList)
120+
if total == 0 {
121+
return nil
122+
}
123+
// pre-allocate by index so concurrent writes to distinct slots are safe without a mutex
124+
manifests := make([]*bean.DesiredOrLiveManifest, total)
125+
126+
batchSize := impl.helmReleaseConfig.ManifestFetchBatchSize
127+
if batchSize <= 0 {
128+
batchSize = 1
129+
}
130+
131+
for i := 0; i < total; {
132+
currentBatch := batchSize
133+
if remaining := total - i; remaining < currentBatch {
134+
currentBatch = remaining
115135
}
116-
manifest, _, err := impl.k8sService.GetLiveManifest(restConfig, resource.GetNamespace(), gvk, resource.GetName())
117-
if err != nil {
118-
impl.logger.Errorw("Error in getting live manifest", "err", err)
119-
statusError, _ := err.(*errors2.StatusError)
120-
desiredManifest := &unstructured.Unstructured{}
121-
desiredManifest.SetGroupVersionKind(*gvk)
122-
desiredManifest.SetName(resource.Name)
123-
desiredManifest.SetNamespace(resource.Namespace)
124-
desiredManifest.SetAnnotations(resource.Annotations)
125-
desiredOrLiveManifest := &bean.DesiredOrLiveManifest{
126-
Manifest: desiredManifest,
127-
// using deep copy as it replaces item in manifest in loop
128-
IsLiveManifestFetchError: true,
129-
}
130-
if statusError != nil {
131-
desiredOrLiveManifest.LiveManifestFetchErrorCode = statusError.Status().Code
132-
}
133-
manifests = append(manifests, desiredOrLiveManifest)
134-
} else {
135-
manifests = append(manifests, &bean.DesiredOrLiveManifest{
136-
Manifest: manifest,
137-
})
136+
var wg sync.WaitGroup
137+
for j := 0; j < currentBatch; j++ {
138+
wg.Add(1)
139+
go func(idx int) {
140+
defer wg.Done()
141+
resource := gvkList[idx]
142+
gvk := &schema.GroupVersionKind{
143+
Group: resource.GetGroup(),
144+
Version: resource.GetVersion(),
145+
Kind: resource.GetKind(),
146+
}
147+
manifest, _, err := impl.k8sService.GetLiveManifest(restConfig, resource.GetNamespace(), gvk, resource.GetName())
148+
if err != nil {
149+
impl.logger.Errorw("Error in getting live manifest", "err", err)
150+
statusError, _ := err.(*errors2.StatusError)
151+
desiredManifest := &unstructured.Unstructured{}
152+
desiredManifest.SetGroupVersionKind(*gvk)
153+
desiredManifest.SetName(resource.Name)
154+
desiredManifest.SetNamespace(resource.Namespace)
155+
desiredManifest.SetAnnotations(resource.Annotations)
156+
desiredOrLiveManifest := &bean.DesiredOrLiveManifest{
157+
Manifest: desiredManifest,
158+
IsLiveManifestFetchError: true,
159+
}
160+
if statusError != nil {
161+
desiredOrLiveManifest.LiveManifestFetchErrorCode = statusError.Status().Code
162+
}
163+
manifests[idx] = desiredOrLiveManifest
164+
} else {
165+
manifests[idx] = &bean.DesiredOrLiveManifest{
166+
Manifest: manifest,
167+
}
168+
}
169+
}(i + j)
138170
}
171+
wg.Wait()
172+
i += currentBatch
139173
}
140174
return manifests
141175
}
@@ -167,19 +201,20 @@ func (impl *ResourceTreeServiceImpl) BuildNodes(request *BuildNodesConfig) (*Bui
167201
response.HealthStatusArray = append(response.HealthStatusArray, getNodesFromManifestResponse.Node.Health)
168202
}
169203

170-
// add child Nodes request
204+
// add child Nodes request; propagate the shared semaphore so all-depth parallelism is bounded
171205
if len(getNodesFromManifestResponse.DesiredOrLiveChildrenManifests) > 0 {
172206
req := NewBuildNodesRequest(NewBuildNodesConfig(request.RestConfig).
173207
WithReleaseNamespace(request.ReleaseNamespace).
174-
WithParentResourceRef(getNodesFromManifestResponse.ResourceRef)).
208+
WithParentResourceRef(getNodesFromManifestResponse.ResourceRef).
209+
WithSemaphore(request.concurrencySem)).
175210
WithDesiredOrLiveManifests(getNodesFromManifestResponse.DesiredOrLiveChildrenManifests...)
176-
// NOTE: Do not use batch worker for child Nodes as it will create batch worker recursively
211+
// NOTE: Do not attach a new batch worker for child Nodes to avoid recursive pool creation
177212
buildChildNodesRequests = append(buildChildNodesRequests, req)
178213
}
179214
}
180215
// build child Nodes, if any.
181216
// NOTE: build child Nodes calls buildNodes recursively
182-
childNodeResponse, err := impl.buildChildNodesInBatch(request.batchWorker, buildChildNodesRequests)
217+
childNodeResponse, err := impl.buildChildNodesInBatch(request.batchWorker, request.concurrencySem, buildChildNodesRequests)
183218
if err != nil {
184219
return response, err
185220
}
@@ -205,11 +240,16 @@ func (impl *ResourceTreeServiceImpl) buildChildNodes(buildChildNodesRequests []*
205240
}
206241

207242
// buildChildNodesInBatch builds child Nodes in parallel from desired or live manifest.
208-
// - It uses batch workers workerPool.WorkerPool[*BuildNodeResponse] to build child Nodes in parallel.
209-
// - If workerPool is not defined, it builds child Nodes sequentially.
210-
func (impl *ResourceTreeServiceImpl) buildChildNodesInBatch(wp *workerPool.WorkerPool[*BuildNodeResponse], buildChildNodesRequests []*BuildNodesConfig) (*BuildNodeResponse, error) {
243+
// - If a workerPool is provided (depth-1), it uses it for parallel execution.
244+
// - If no workerPool but a semaphore is provided (depth-2+), it uses semaphore-bounded goroutines.
245+
// - Falls back to sequential when neither is available.
246+
func (impl *ResourceTreeServiceImpl) buildChildNodesInBatch(wp *workerPool.WorkerPool[*BuildNodeResponse], sem chan struct{}, buildChildNodesRequests []*BuildNodesConfig) (*BuildNodeResponse, error) {
211247
if wp == nil {
212-
// build child Nodes sequentially
248+
if sem != nil {
249+
// depth-2+ parallel path: bounded by shared semaphore
250+
return impl.buildChildNodesWithSemaphore(sem, buildChildNodesRequests)
251+
}
252+
// no concurrency available; build child Nodes sequentially
213253
return impl.buildChildNodes(buildChildNodesRequests)
214254
}
215255
response := NewBuildNodeResponse()
@@ -235,6 +275,66 @@ func (impl *ResourceTreeServiceImpl) buildChildNodesInBatch(wp *workerPool.Worke
235275
return response, nil
236276
}
237277

278+
// buildChildNodesWithSemaphore builds child Nodes in parallel using a shared semaphore for bounded concurrency.
279+
// It uses a non-blocking semaphore acquisition: if the semaphore is full the request is processed sequentially
280+
// in the calling goroutine, preventing deadlocks during recursive invocations where the caller already holds a slot.
281+
func (impl *ResourceTreeServiceImpl) buildChildNodesWithSemaphore(sem chan struct{}, buildChildNodesRequests []*BuildNodesConfig) (*BuildNodeResponse, error) {
282+
response := NewBuildNodeResponse()
283+
var mu sync.Mutex
284+
var wg sync.WaitGroup
285+
// buffered channel of size 1 captures the first error; subsequent errors are dropped
286+
errCh := make(chan error, 1)
287+
288+
for _, req := range buildChildNodesRequests {
289+
// check for an earlier error before launching more work
290+
select {
291+
case err := <-errCh:
292+
wg.Wait()
293+
return response, err
294+
default:
295+
}
296+
297+
// try to acquire a semaphore slot without blocking
298+
select {
299+
case sem <- struct{}{}:
300+
// slot acquired — run in a goroutine
301+
wg.Add(1)
302+
go func(r *BuildNodesConfig) {
303+
defer wg.Done()
304+
defer func() { <-sem }() // release slot when done
305+
childResp, err := impl.BuildNodes(r)
306+
if err != nil {
307+
impl.logger.Errorw("error in building child Nodes", "ReleaseNamespace", r.ReleaseNamespace, "parentResource", r.ParentResourceRef.GetGvk(), "err", err)
308+
select {
309+
case errCh <- err:
310+
default:
311+
}
312+
return
313+
}
314+
mu.Lock()
315+
response.WithNodes(childResp.Nodes).WithHealthStatusArray(childResp.HealthStatusArray)
316+
mu.Unlock()
317+
}(req)
318+
default:
319+
// semaphore full — process sequentially in the calling goroutine to avoid deadlock
320+
childResp, err := impl.BuildNodes(req)
321+
if err != nil {
322+
impl.logger.Errorw("error in building child Nodes sequentially", "ReleaseNamespace", req.ReleaseNamespace, "err", err)
323+
wg.Wait()
324+
return response, err
325+
}
326+
response.WithNodes(childResp.Nodes).WithHealthStatusArray(childResp.HealthStatusArray)
327+
}
328+
}
329+
wg.Wait()
330+
select {
331+
case err := <-errCh:
332+
return response, err
333+
default:
334+
return response, nil
335+
}
336+
}
337+
238338
func (impl *ResourceTreeServiceImpl) getNodeFromDesiredOrLiveManifest(request *GetNodeFromManifestRequest) (*GetNodeFromManifestResponse, error) {
239339
response := NewGetNodesFromManifestResponse()
240340
manifest := request.DesiredOrLiveManifest.Manifest

0 commit comments

Comments
 (0)