feat(transfer): Optimized approach for OCM transfer by implementing a concurrent worker pool.#1420
Conversation
Optimized approach for OCM transfer by implementing a concurrent worker pool, replacing the previous sequential transfer logic.
jakobmoellerdev
left a comment
There was a problem hiding this comment.
Thanks for the contribution.
Please adjust the following:
- Ensure Context Cancellation is still possible
- Ensure tests are green
- Please keep existing comments and keep your change set minimal for the change needed.
- Please consider what happens if children have joint dependencies
Component A
=> Component B
=> Component C
Component B
=> Component D
Component C
=> Component D
Currently you are triggering multiple transfers for Component D which is incorrect.
Cheers!
|
Thanks for the feedback! I have a few questions regarding the points you've raised:
|
I expect that the worker pool can be cancelled if a parent context is cancelled. you removed the parent context right now. I think you can just add this back in and create a new subcontext: https://github.com/open-component-model/ocm/pull/1420/files#diff-d341af7fc25e4e0ea76361047d0e578a1e197d1e598fcba3ffc5ce866c42358dL38
It is imperative that the transfer for a single component version in the handler happens at most once per transfer attempt. Retries are handled within there, so we should avoid triggering it multiple times. There are a few ways you can do this, for example with mutexes or synced maps.
See above :) One other thing: If the transfer of one component version fails, we should cancel all other active transfers. For that you should possibly use an error group from the sync package |
Updated existing implementation with new changes
| } | ||
|
|
||
| // subp := common.AddPrinterGap(ctx, " ") | ||
| var wg sync.WaitGroup |
There was a problem hiding this comment.
I just noticed this while taking a look here again, but this is not implemented as described in the PR:
Enabled parallel transfer of artifacts using a concurrent worker pool (default size: 5 workers, configurable)
Tuned worker pool size for optimal performance and resource utilization.
You should make this sync group limited and configurable
There was a problem hiding this comment.
This comment is still not resolved
There was a problem hiding this comment.
Is the expectation here that once one of the go routine fails, all others should abort? if so, then you should use an errgroup.WithContext. WDYT?
| func CopyVersionWithContext(cctx context.Context, log logging.Logger, hist common.History, src ocm.ComponentVersionAccess, t ocm.ComponentVersionAccess, handler TransferHandler) (rerr error) { | ||
| return copyVersion(cctx, common.GetPrinter(cctx), log, hist, src, t, src.GetDescriptor().Copy(), handler) | ||
| func CopyVersionWithContext(cctx context.Context, printer common.Printer, log logging.Logger, hist common.History, src ocm.ComponentVersionAccess, t ocm.ComponentVersionAccess, handler TransferHandler) (rerr error) { | ||
| return copyVersionWithWorkerPool(cctx, printer, log, hist, src, t, src.GetDescriptor().Copy(), handler, 5) |
There was a problem hiding this comment.
The 5 workers seem very specific to your trial run and should not be used for OCM in general.
I have 2 suggestions for possible value setting:
- Default to a runtime-dynamic value, for example
runtime.NumCPU()that defaults to active discovered CPUs by the GO runtime - Default to a sequential value, i.e. 1
Choose whichever you prefer.
That being said, to properly make it configurable like you describe in your PR, I am missing exactly that: a configurable flag. As such you will need to introduce both a setting/option as well as a CLI flag for concurrency. Ideally this should also be configurable in .ocmconfig. I don't mind for now if you want to have this in separate PRs but imo the merge should not happen without ANY configuration ability.
Thanks!
| i, r := i, r | ||
| tasks <- transferTask{ | ||
| id: fmt.Sprintf("resource-%d", i), | ||
| task: func() error { |
There was a problem hiding this comment.
This functional transfer task is very hard to read and debug, so I suggest you split this into a separate function.
|
Hi @jakobmoellerdev, Thank you for the review and feedback ! I'll look into the suggestions you provided. In the meantime, I wanted to raise a concern I noticed while testing the OCM transfer process locally using the latest version of the OCM codebase. When I run it after cloning, I consistently encounter the following error: Error: transfer resources and sources: commit and ref cannot be specified at the same time This error appears even when using the latest OCM transfer.go code (without my changes), and although the transfer process completes, the blobs are not getting transferred to the JFrog repository.
It would be great if someone could help verify this behavior and share any insights. |
|
There is a related change I did quite some time back that influenced the github access method: #1406 i introduced this limitation because ref and commit are mutually exclusive to the access method. Im assuming you have incorrect specs in your component version |
Updated existing code with new changes based on the previous feedbacks
|
Hi @jakobmoellerdev , Thank you for the previous review and inputs! I've made the necessary changes based on your comments. Could you please review the latest changes and let me know if everything looks good? Additionally, I wanted to follow up regarding the issue I'm facing during local testing - the blobs are still not getting transferred and I'm consistently seeing the following error: transfer resources and sources: commit and ref cannot be specified at the same time You also suggested that the issue could be due to incorrect specs in the component version. Looking forward to your input. Thanks again! |
|
The issue you were facing is a regression we reverted now in the latest RCs and on main, you shouldnt have this error anymore. :) |
|
Hi @jakobmoellerdev, There are a couple of checks that are failing, and I’d appreciate your guidance on how to proceed:
Could you please share some insights or point me in the right direction to resolve these? Thanks in advance! |
jakobmoellerdev
left a comment
There was a problem hiding this comment.
Regarding the lint error. you shoud fixup your import ordering on the project such that
gci:
sections:
- standard
- blank
- dot
- default
- prefix(ocm.software/ocm)
custom-order: true
is set. your file edit breaks these rules. please see the GCI lint documentation on how to fix that and your IDE documentation on how to configure import ordering. You can also run golangci-lint in --fix mode to correct this automatically.
Regarding the test run:
you are changing the way that notifyArtifactInfo is called in your transfer logic.
In your code https://github.com/open-component-model/ocm/pull/1420/files#diff-d341af7fc25e4e0ea76361047d0e578a1e197d1e598fcba3ffc5ce866c42358dR316 I am assuming that you changed how changed and valueNeeded are evaluated and that causes a different handling. Please double check that the behavior on the breaking test is the same as before your change.
Before:
var old compdesc.Resource
hint := ocmcpi.ArtifactNameHint(a, src)
old, err = cur.GetResourceByIdentity(r.Meta().GetIdentity(srccd.Resources))
changed := err != nil || old.Digest == nil || !old.Digest.Equal(r.Meta().Digest)
valueNeeded := err == nil && needsTransport(src.GetContext(), r, &old)
if changed || valueNeeded {
var msgs []interface{}
if !errors.IsErrNotFound(err) {
if err != nil {
return err
}
if !changed && valueNeeded {
msgs = []interface{}{"copy"}
} else {
msgs = []interface{}{"overwrite"}
}
}
notifyArtifactInfo(printer, log, "resource", i, r.Meta(), hint, msgs...)
err = handler.HandleTransferResource(r, m, hint, t)
} else {
if err == nil { // old resource found -> keep current access method
t.SetResource(r.Meta(), old.Access, ocm.ModifyElement(), ocm.SkipVerify(), ocm.DisableExtraIdentityDefaulting())
}
notifyArtifactInfo(printer, log, "resource", i, r.Meta(), hint, "already present")
}
after
hint := ocmcpi.ArtifactNameHint(a, src)
old, err := cur.GetResourceByIdentity(r.Meta().GetIdentity(srccd.Resources))
changed := err != nil || old.Digest == nil || !old.Digest.Equal(r.Meta().Digest)
valueNeeded := err == nil && needsTransport(src.GetContext(), r, &old)
if changed || valueNeeded {
notifyArtifactInfo(printer, log, "resource", i, r.Meta(), hint, "copy")
return handler.HandleTransferResource(r, m, hint, t)
} else if err == nil {
t.SetResource(r.Meta(), old.Access, ocm.ModifyElement(), ocm.SkipVerify(), ocm.DisableExtraIdentityDefaulting())
notifyArtifactInfo(printer, log, "resource", i, r.Meta(), hint, "already present")
}
| } | ||
|
|
||
| // subp := common.AddPrinterGap(ctx, " ") | ||
| var wg sync.WaitGroup |
There was a problem hiding this comment.
This comment is still not resolved
|
|
||
| nested := finalize.Nested() | ||
| log.Info(" transferring resources and sources using worker pool", "workers", maxWorkers) | ||
| tasks := make(chan transferTask) |
There was a problem hiding this comment.
can you not make the tasks a bounded channel too?
There was a problem hiding this comment.
Yes, absolutely! I've now updated the tasks channel to be a bounded (buffered) channel within the copyVersionWithWorkerPool function.
This helps to:
- Decouple the task producer from the workers slightly.
- Smooth out throughput by allowing a small backlog of tasks to accumulate.
- Control memory usage by limiting the number of in-flight tasks.
The buffer size is set to maxWorkers * 2 (with a minimum of 1), which provides a good balance.
Here's the updated line in copyVersionWithWorkerPool:
// ...
log.Info(" transferring resources and sources using worker pool", "workers", maxWorkers)
// Make tasks a bounded (buffered) channel to smooth out task distribution.
taskBufferSize := maxWorkers * 2
if taskBufferSize < 1 { // Ensure a minimum buffer size
taskBufferSize = 1
}
tasks := make(chan transferTask, taskBufferSize) // <-- Now a buffered channel
errChan := make(chan error, len(src.GetResources())+len(src.GetSources()))
// ...
Thanks for the suggestion!
There was a problem hiding this comment.
I do not think it is necessary for you to introduce an unknown buffer size. instead you can introduce a goroutine pool that is buffered to the number of resources+sources no? (same as the errChan)
| } | ||
|
|
||
| go func() { | ||
| for i, r := range src.GetResources() { |
There was a problem hiding this comment.
why do you need to run a go routine to submit transfer tasks to the pool?
There was a problem hiding this comment.
The task submission loop (go func() { ... tasks <- ... close(tasks) }) runs in its own goroutine primarily to:
- Avoid Deadlock: If it ran in the main goroutine, sending tasks could block (if the channel is full), preventing wg.Wait() and close(tasks) from being reached.
- Proper Channel Closure: Ensures close(tasks) is called only after all tasks have been submitted, allowing worker range loops to terminate correctly.
This pattern allows the task producer to run concurrently with the workers and manage channel lifecycle gracefully.
There was a problem hiding this comment.
I understand why in general a submission is done in a goroutine. However, im wondering why you need it. You know the length of elements in the channel in advance so it is always buffered correctly and can never lock. as such you do not have a deadlock. that also simplifies channel closure.
|
Hi @jakobmoellerdev , could you please trigger all the remaining test cases ? |
|
Hey there: tests/lint still failing, PTAL. Also please sign and make sure DCO is passing. also run go generate as there are diffs. Thanks! |
Signed-off-by: Archanaa.N <archanaa.n@sap.com>
|
I think you broke the lint because you touched every single go file in the repo and removed a separator |
|
@dynamic-Archu can you pls try to sign all the commits , which can help us to remove 1 issue |
|
folks if you dont mind I think someone from the OCM team should come in here, pick this up and help you get this in. Its been sitting very long and we really want to have this |
|
Hey @Umang2608 , I tried doing the DCO signing earlier using the rebase command, but it didn’t work out on my end. Since I’ve already given you access to the repo, please feel free to handle the signing if that’s easier from your side. I also noticed that the OCM team has started making changes and restructuring things, so I’m not sure if it would be ideal for me to modify anything right now. Please let me know if you’d like me to proceed or hold off for the moment. |
|
Thanks, @jakobmoellerdev ! Sounds good , I’d really appreciate the OCM team’s help in getting this finalized. Please feel free to make the necessary updates, and I’ll stay available for any context or clarifications needed from my side. |
|
Its great @jakobmoellerdev , if you suggest I can plan some meetings from next week (once a week) so we can collaborate smoothy, let me know your teams availability |
Hey @jakobmoellerdev , pls share an update on this PR |
|
I will rebase and get this PR ready next sprint (starts Nov18) |
|
Hey folks, closing this in favor of #1676. PTAL there if youre looking for progress. @dynamic-Archu i made you coauthor of a commit in that PR that contains all the changes you had in here as the 58 commits got a bit messy. This will now undergo regular review from the OCM team. |



Optimized approach for OCM transfer by implementing a concurrent worker pool, replacing the previous sequential transfer logic.
What this PR does / why we need it
This PR introduces an optimized approach for OCM transfer by implementing a concurrent worker pool, replacing the previous sequential transfer logic.
Improvements:
Reduced transfer time from ~45 minutes to ~23 minutes 40 seconds.
Achieved nearly 2x speedup by leveraging concurrent processing.
Introduced a configurable concurrent worker pool to parallelize transfers.
Technical Approach:
Replaced copyVersion with a new function copyVersionWithWorkerPool, using a bounded goroutine worker pool
Introduced a transferTask abstraction to encapsulate resource and source transfer logic
Enabled parallel transfer of artifacts using a concurrent worker pool (default size: 5 workers)
Tuned worker pool size for optimal performance and resource utilization.
Why:
Introducing concurrency significantly improves transfer speed and overall efficiency.
Which issue(s) this PR fixes