Skip to content

Commit 908d58a

Browse files
petr-mullerclaude
andcommitted
fix(private-org-sync): process repos in parallel with configurable workers
Extract per-repo sync logic into a syncRepo function and process repos concurrently using a worker pool. The --parallelism flag (default 4) controls the number of concurrent workers. Each worker gets its own copy of the gitSyncer struct, avoiding races on the mutable logger field. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0d3999e commit 908d58a

1 file changed

Lines changed: 37 additions & 9 deletions

File tree

cmd/private-org-sync/main.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os/exec"
1010
"path/filepath"
1111
"strings"
12+
"sync"
1213
"time"
1314

1415
"github.com/sirupsen/logrus"
@@ -65,6 +66,7 @@ type options struct {
6566
confirm bool
6667
failOnNonexistentDst bool
6768
debug bool
69+
parallelism int
6870
}
6971

7072
const defaultPrefix = "https://github.com"
@@ -144,6 +146,7 @@ func gatherOptions() options {
144146
fs.BoolVar(&o.failOnNonexistentDst, "fail-on-missing-destination", false, "Set true to make the tool to consider missing sync destination as an error")
145147

146148
fs.BoolVar(&o.debug, "debug", false, "Set true to enable debug logging level")
149+
fs.IntVar(&o.parallelism, "parallelism", 4, "Number of repos to sync in parallel")
147150

148151
o.Options.Bind(fs)
149152
o.WhitelistOptions.Bind(fs)
@@ -362,6 +365,8 @@ type location struct {
362365
org, repo, branch string
363366
}
364367

368+
type repoKey struct{ org, repo string }
369+
365370
func (l location) String() string {
366371
return fmt.Sprintf("%s/%s@%s", l.org, l.repo, l.branch)
367372
}
@@ -773,7 +778,6 @@ func main() {
773778
}
774779

775780
// Group locations by (org, repo) so we can initialize each repo once
776-
type repoKey struct{ org, repo string }
777781
grouped := make(map[repoKey][]location)
778782
for source := range locations {
779783
key := repoKey{org: source.org, repo: source.repo}
@@ -786,16 +790,40 @@ func main() {
786790
flattenedOrgs.Insert(o.org)
787791
}
788792

789-
for key, branches := range grouped {
790-
syncer.logger = logrus.WithFields(logrus.Fields{"org": key.org, "repo": key.repo})
791-
792-
dstRepo := key.repo
793-
if !flattenedOrgs.Has(key.org) {
794-
dstRepo = fmt.Sprintf("%s-%s", key.org, key.repo)
795-
}
793+
type repoWork struct {
794+
key repoKey
795+
branches []location
796+
}
797+
work := make(chan repoWork)
798+
var errsMu sync.Mutex
799+
var wg sync.WaitGroup
800+
801+
for i := 0; i < o.parallelism; i++ {
802+
wg.Add(1)
803+
go func() {
804+
defer wg.Done()
805+
for item := range work {
806+
repoSyncer := syncer
807+
repoSyncer.logger = logrus.WithFields(logrus.Fields{"org": item.key.org, "repo": item.key.repo})
808+
dstRepo := item.key.repo
809+
if !flattenedOrgs.Has(item.key.org) {
810+
dstRepo = fmt.Sprintf("%s-%s", item.key.org, item.key.repo)
811+
}
812+
repoErrs := repoSyncer.syncRepo(item.key.org, item.key.repo, o.targetOrg, dstRepo, item.branches)
813+
if len(repoErrs) > 0 {
814+
errsMu.Lock()
815+
errs = append(errs, repoErrs...)
816+
errsMu.Unlock()
817+
}
818+
}
819+
}()
820+
}
796821

797-
errs = append(errs, syncer.syncRepo(key.org, key.repo, o.targetOrg, dstRepo, branches)...)
822+
for key, branches := range grouped {
823+
work <- repoWork{key: key, branches: branches}
798824
}
825+
close(work)
826+
wg.Wait()
799827

800828
if len(errs) > 0 {
801829
logrus.WithError(utilerrors.NewAggregate(errs)).Fatal("There were failures")

0 commit comments

Comments
 (0)