Skip to content

Commit 544c494

Browse files
fix: set maximum of goroutines
1 parent a95cd85 commit 544c494

11 files changed

Lines changed: 67 additions & 82 deletions

File tree

cmd/workers.go

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
11
package cmd
22

33
import (
4-
"github.com/checkmarx/2ms/lib/secrets"
5-
"sync"
6-
74
"github.com/checkmarx/2ms/engine"
85
"github.com/checkmarx/2ms/engine/extra"
6+
"golang.org/x/sync/errgroup"
97
)
108

119
func processItems(engine *engine.Engine, pluginName string) {
1210
defer channels.WaitGroup.Done()
1311

14-
wgItems := &sync.WaitGroup{}
12+
g := errgroup.Group{}
13+
g.SetLimit(1000)
1514
for item := range channels.Items {
1615
report.TotalItemsScanned++
17-
wgItems.Add(1)
18-
go engine.Detect(item, secretsChan, wgItems, pluginName, channels.Errors)
16+
g.Go(func() error {
17+
engine.Detect(item, secretsChan, pluginName, channels.Errors)
18+
return nil
19+
})
1920
}
20-
wgItems.Wait()
21+
g.Wait()
2122
close(secretsChan)
2223
}
2324

@@ -42,37 +43,43 @@ func processSecrets() {
4243
func processSecretsExtras() {
4344
defer channels.WaitGroup.Done()
4445

45-
wgExtras := &sync.WaitGroup{}
46+
g := errgroup.Group{}
47+
g.SetLimit(10)
4648
for secret := range secretsExtrasChan {
47-
wgExtras.Add(1)
48-
go extra.AddExtraToSecret(secret, wgExtras)
49+
g.Go(func() error {
50+
extra.AddExtraToSecret(secret)
51+
return nil
52+
})
4953
}
50-
wgExtras.Wait()
54+
g.Wait()
5155
}
5256

5357
func processValidationAndScoreWithValidation(engine *engine.Engine) {
5458
defer channels.WaitGroup.Done()
5559

56-
wgValidation := &sync.WaitGroup{}
60+
g := errgroup.Group{}
61+
g.SetLimit(10)
5762
for secret := range validationChan {
58-
wgValidation.Add(2)
59-
go func(secret *secrets.Secret, wg *sync.WaitGroup) {
60-
engine.RegisterForValidation(secret, wg)
61-
engine.Score(secret, true, wg)
62-
}(secret, wgValidation)
63+
g.Go(func() error {
64+
engine.RegisterForValidation(secret)
65+
engine.Score(secret, true)
66+
return nil
67+
})
6368
}
64-
wgValidation.Wait()
65-
69+
g.Wait()
6670
engine.Validate()
6771
}
6872

6973
func processScoreWithoutValidation(engine *engine.Engine) {
7074
defer channels.WaitGroup.Done()
7175

72-
wgScore := &sync.WaitGroup{}
76+
g := errgroup.Group{}
77+
g.SetLimit(10)
7378
for secret := range cvssScoreWithoutValidationChan {
74-
wgScore.Add(1)
75-
go engine.Score(secret, false, wgScore)
79+
g.Go(func() error {
80+
engine.Score(secret, false)
81+
return nil
82+
})
7683
}
77-
wgScore.Wait()
84+
g.Wait()
7885
}

engine/engine.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package engine
33
import (
44
"crypto/sha1"
55
"fmt"
6-
"github.com/checkmarx/2ms/engine/linecontent"
7-
"github.com/checkmarx/2ms/engine/score"
86
"os"
97
"regexp"
108
"strings"
11-
"sync"
129
"text/tabwriter"
1310

11+
"github.com/checkmarx/2ms/engine/linecontent"
12+
"github.com/checkmarx/2ms/engine/score"
13+
1414
"github.com/checkmarx/2ms/engine/rules"
1515
"github.com/checkmarx/2ms/engine/validation"
1616
"github.com/checkmarx/2ms/lib/secrets"
@@ -78,9 +78,7 @@ func Init(engineConfig EngineConfig) (*Engine, error) {
7878
}, nil
7979
}
8080

81-
func (e *Engine) Detect(item plugins.ISourceItem, secretsChannel chan *secrets.Secret, wg *sync.WaitGroup, pluginName string, errors chan error) {
82-
defer wg.Done()
83-
81+
func (e *Engine) Detect(item plugins.ISourceItem, secretsChannel chan *secrets.Secret, pluginName string, errors chan error) {
8482
fragment := detect.Fragment{
8583
Raw: *item.GetContent(),
8684
FilePath: item.GetSource(),
@@ -137,13 +135,11 @@ func (e *Engine) AddRegexRules(patterns []string) error {
137135
return nil
138136
}
139137

140-
func (s *Engine) RegisterForValidation(secret *secrets.Secret, wg *sync.WaitGroup) {
141-
defer wg.Done()
138+
func (s *Engine) RegisterForValidation(secret *secrets.Secret) {
142139
s.validator.RegisterForValidation(secret)
143140
}
144141

145-
func (s *Engine) Score(secret *secrets.Secret, validateFlag bool, wg *sync.WaitGroup) {
146-
defer wg.Done()
142+
func (s *Engine) Score(secret *secrets.Secret, validateFlag bool) {
147143
validationStatus := secrets.UnknownResult // default validity
148144
if validateFlag {
149145
validationStatus = secret.ValidationStatus

engine/engine_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package engine
22

33
import (
44
"fmt"
5-
"github.com/stretchr/testify/assert"
6-
"sync"
75
"testing"
86

7+
"github.com/stretchr/testify/assert"
8+
99
"github.com/checkmarx/2ms/engine/rules"
1010
"github.com/checkmarx/2ms/lib/secrets"
1111
"github.com/checkmarx/2ms/plugins"
@@ -79,9 +79,7 @@ func TestDetector(t *testing.T) {
7979

8080
secretsChan := make(chan *secrets.Secret, 1)
8181
errorsChan := make(chan error, 1)
82-
wg := &sync.WaitGroup{}
83-
wg.Add(1)
84-
detector.Detect(i, secretsChan, wg, fsPlugin.GetName(), errorsChan)
82+
detector.Detect(i, secretsChan, fsPlugin.GetName(), errorsChan)
8583
close(secretsChan)
8684

8785
s := <-secretsChan
@@ -155,9 +153,7 @@ func TestSecrets(t *testing.T) {
155153
fmt.Printf("Start test %s", name)
156154
secretsChan := make(chan *secrets.Secret, 1)
157155
errorsChan := make(chan error, 1)
158-
wg := &sync.WaitGroup{}
159-
wg.Add(1)
160-
detector.Detect(item{content: &secret.Content}, secretsChan, wg, fsPlugin.GetName(), errorsChan)
156+
detector.Detect(item{content: &secret.Content}, secretsChan, fsPlugin.GetName(), errorsChan)
161157
close(secretsChan)
162158
close(errorsChan)
163159

engine/extra/extra.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"fmt"
77
"strings"
8-
"sync"
98

109
"github.com/checkmarx/2ms/lib/secrets"
1110
)
@@ -16,8 +15,7 @@ var ruleIDToFunction = map[string]addExtraFunc{
1615
"jwt": addExtraJWT,
1716
}
1817

19-
func AddExtraToSecret(secret *secrets.Secret, wg *sync.WaitGroup) {
20-
defer wg.Done()
18+
func AddExtraToSecret(secret *secrets.Secret) {
2119
if addExtra, ok := ruleIDToFunction[secret.RuleID]; ok {
2220
extraData := addExtra(secret)
2321
if extraData != nil && extraData != "" {

engine/extra/extra_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package extra
33
import (
44
"encoding/base64"
55
"fmt"
6+
"testing"
7+
68
"github.com/checkmarx/2ms/lib/secrets"
79
"github.com/stretchr/testify/assert"
8-
"sync"
9-
"testing"
1010
)
1111

1212
func TestAddExtraToSecret(t *testing.T) {
@@ -50,10 +50,7 @@ func TestAddExtraToSecret(t *testing.T) {
5050
ExtraDetails: make(map[string]interface{}),
5151
}
5252

53-
var wg sync.WaitGroup
54-
wg.Add(1)
55-
AddExtraToSecret(secret, &wg)
56-
wg.Wait()
53+
AddExtraToSecret(secret)
5754

5855
assert.Equal(t, tt.expectedOutput, secret.ExtraDetails["secretDetails"])
5956
})

engine/score/score_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package score_test
22

33
import (
4+
"sync"
5+
"testing"
6+
47
. "github.com/checkmarx/2ms/engine"
58
"github.com/checkmarx/2ms/engine/rules"
69
"github.com/checkmarx/2ms/engine/score"
710
"github.com/checkmarx/2ms/lib/secrets"
811
"github.com/stretchr/testify/assert"
912
ruleConfig "github.com/zricethezav/gitleaks/v8/cmd/generate/config/rules"
10-
"sync"
11-
"testing"
1213
)
1314

1415
func TestScore(t *testing.T) {
@@ -216,9 +217,9 @@ func TestScore(t *testing.T) {
216217
expectedRuleScores := expectedCvssScores[secret.RuleID]
217218
validityIndex := getValidityIndex(secret.ValidationStatus)
218219
unknownIndex := getValidityIndex(secrets.UnknownResult)
219-
engine.Score(secret, true, &wg)
220+
engine.Score(secret, true)
220221
assert.Equal(t, expectedRuleScores[validityIndex], secret.CvssScore, "rule: %s", secret.RuleID)
221-
engine.Score(secret, false, &wg)
222+
engine.Score(secret, false)
222223
assert.Equal(t, expectedRuleScores[unknownIndex], secret.CvssScore, "rule: %s", secret.RuleID)
223224
}
224225
}

engine/validation/pairs.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package validation
22

33
import (
4-
"sync"
5-
64
"github.com/checkmarx/2ms/lib/secrets"
75
)
86

@@ -38,8 +36,7 @@ func (p *pairsCollector) addIfNeeded(secret *secrets.Secret) bool {
3836
return true
3937
}
4038

41-
func (p *pairsCollector) validate(generalKey string, rulesById pairsByRuleId, wg *sync.WaitGroup) {
42-
defer wg.Done()
39+
func (p *pairsCollector) validate(generalKey string, rulesById pairsByRuleId) {
4340
generalKeyToValidation[generalKey](rulesById)
4441
}
4542

engine/validation/validator.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package validation
22

33
import (
4-
"sync"
5-
64
"github.com/checkmarx/2ms/engine/extra"
75
"github.com/checkmarx/2ms/lib/secrets"
86
)
@@ -35,14 +33,11 @@ func (v *Validator) RegisterForValidation(secret *secrets.Secret) {
3533
}
3634

3735
func (v *Validator) Validate() {
38-
wg := &sync.WaitGroup{}
3936
for generalKey, bySource := range v.pairsCollector.pairs {
4037
for _, byRule := range bySource {
41-
wg.Add(1)
42-
v.pairsCollector.validate(generalKey, byRule, wg)
38+
v.pairsCollector.validate(generalKey, byRule)
4339
}
4440
}
45-
wg.Wait()
4641
}
4742

4843
func IsCanValidateRule(ruleID string) bool {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/spf13/viper v1.18.2-0.20240419203757-d539b7a2462e
1313
github.com/stretchr/testify v1.9.0
1414
github.com/zricethezav/gitleaks/v8 v8.18.2
15+
golang.org/x/sync v0.10.0
1516
golang.org/x/time v0.5.0
1617
gopkg.in/yaml.v3 v3.0.1
1718
)
@@ -48,7 +49,6 @@ require (
4849
go.uber.org/multierr v1.11.0 // indirect
4950
golang.org/x/crypto v0.32.0 // indirect
5051
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
51-
golang.org/x/sync v0.10.0 // indirect
5252
golang.org/x/sys v0.29.0 // indirect
5353
golang.org/x/text v0.21.0 // indirect
5454
gopkg.in/ini.v1 v1.67.0 // indirect

plugins/filesystem.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ import (
44
"fmt"
55
"os"
66
"path/filepath"
7-
"sync"
87
"time"
98

109
"github.com/rs/zerolog/log"
1110
"github.com/spf13/cobra"
11+
"golang.org/x/sync/errgroup"
1212
)
1313

1414
const (
@@ -38,9 +38,7 @@ func (p *FileSystemPlugin) DefineCommand(items chan ISourceItem, errors chan err
3838
Run: func(cmd *cobra.Command, args []string) {
3939
log.Info().Msg("Folder plugin started")
4040

41-
wg := &sync.WaitGroup{}
42-
p.getFiles(items, errors, wg)
43-
wg.Wait()
41+
p.getFiles(items, errors)
4442
close(items)
4543
},
4644
}
@@ -60,7 +58,7 @@ func (p *FileSystemPlugin) DefineCommand(items chan ISourceItem, errors chan err
6058
return cmd, nil
6159
}
6260

63-
func (p *FileSystemPlugin) getFiles(items chan ISourceItem, errs chan error, wg *sync.WaitGroup) {
61+
func (p *FileSystemPlugin) getFiles(items chan ISourceItem, errs chan error) {
6462
fileList := make([]string, 0)
6563
err := filepath.Walk(p.Path, func(path string, fInfo os.FileInfo, err error) error {
6664
if err != nil {
@@ -98,23 +96,25 @@ func (p *FileSystemPlugin) getFiles(items chan ISourceItem, errs chan error, wg
9896
return
9997
}
10098

101-
p.getItems(items, errs, wg, fileList)
99+
p.getItems(items, errs, fileList)
102100
}
103101

104-
func (p *FileSystemPlugin) getItems(items chan ISourceItem, errs chan error, wg *sync.WaitGroup, fileList []string) {
102+
func (p *FileSystemPlugin) getItems(items chan ISourceItem, errs chan error, fileList []string) {
103+
g := errgroup.Group{}
104+
g.SetLimit(1000)
105105
for _, filePath := range fileList {
106-
wg.Add(1)
107-
go func(filePath string) {
108-
defer wg.Done()
106+
g.Go(func() error {
109107
actualFile, err := p.getItem(filePath)
110108
if err != nil {
111109
errs <- err
112110
time.Sleep(time.Second) // Temporary fix for incorrect non-error exits; needs a better solution.
113-
return
111+
return nil
114112
}
115113
items <- *actualFile
116-
}(filePath)
114+
return nil
115+
})
117116
}
117+
g.Wait()
118118
}
119119

120120
func (p *FileSystemPlugin) getItem(filePath string) (*item, error) {

0 commit comments

Comments
 (0)