Skip to content

Commit 10d45b4

Browse files
refactor: created dedicated packages for semaphore and chunk
1 parent 7249d81 commit 10d45b4

8 files changed

Lines changed: 391 additions & 372 deletions

File tree

cmd/workers.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,21 @@ import (
66
"github.com/checkmarx/2ms/engine/extra"
77
"github.com/checkmarx/2ms/lib/secrets"
88
"golang.org/x/sync/errgroup"
9-
"golang.org/x/sync/semaphore"
109
"sync"
1110
)
1211

13-
func ProcessItems(engineInstance *engine.Engine, pluginName string) {
12+
func ProcessItems(engineInstance engine.IEngine, pluginName string) {
1413
defer Channels.WaitGroup.Done()
1514

1615
g, ctx := errgroup.WithContext(context.Background())
17-
memoryBudget := chooseMemoryBudget()
18-
sem := semaphore.NewWeighted(memoryBudget)
1916
for item := range Channels.Items {
2017
Report.TotalItemsScanned++
2118
item := item
2219

2320
switch pluginName {
2421
case "filesystem":
2522
g.Go(func() error {
26-
return engineInstance.DetectFile(ctx, item, SecretsChan, memoryBudget, sem)
23+
return engineInstance.DetectFile(ctx, item, SecretsChan)
2724
})
2825
default:
2926
g.Go(func() error {
@@ -67,7 +64,7 @@ func ProcessSecretsExtras() {
6764
wgExtras.Wait()
6865
}
6966

70-
func ProcessValidationAndScoreWithValidation(engine *engine.Engine) {
67+
func ProcessValidationAndScoreWithValidation(engine engine.IEngine) {
7168
defer Channels.WaitGroup.Done()
7269

7370
wgValidation := &sync.WaitGroup{}
@@ -83,7 +80,7 @@ func ProcessValidationAndScoreWithValidation(engine *engine.Engine) {
8380
engine.Validate()
8481
}
8582

86-
func ProcessScoreWithoutValidation(engine *engine.Engine) {
83+
func ProcessScoreWithoutValidation(engine engine.IEngine) {
8784
defer Channels.WaitGroup.Done()
8885

8986
wgScore := &sync.WaitGroup{}

engine/chunk/chunk.go

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
package chunk
2+
3+
//go:generate mockgen -source=$GOFILE -destination=${GOPACKAGE}_mock.go -package=${GOPACKAGE}
4+
5+
import (
6+
"bufio"
7+
"bytes"
8+
"errors"
9+
"fmt"
10+
"io"
11+
"sync"
12+
"unicode"
13+
14+
"github.com/h2non/filetype"
15+
)
16+
17+
const (
18+
defaultSize = 100 * 1024 // 100Kib
19+
defaultMaxPeekSize = 25 * 1024 // 25Kib
20+
defaultFileThreshold = 1 * 1024 * 1024 // 1MiB
21+
)
22+
23+
var ErrUnsupportedFileType = errors.New("unsupported file type")
24+
25+
type Chunk struct {
26+
BufPool *sync.Pool
27+
PeekBufPool *sync.Pool
28+
Size int
29+
MaxPeekSize int
30+
SmallFileThreshold int64
31+
}
32+
33+
type IChunk interface {
34+
GetBuf() (*[]byte, bool)
35+
PutBuf(buf *[]byte)
36+
GetPeekBuf(buf []byte) (*bytes.Buffer, bool)
37+
PutPeekBuf(buf *bytes.Buffer)
38+
GetSize() int
39+
GetMaxPeekSize() int
40+
GetFileThreshold() int64
41+
ReadChunk(reader *bufio.Reader, totalLines int) (string, error)
42+
}
43+
44+
func NewChunk() *Chunk {
45+
return NewChunkWithSize(defaultSize, defaultMaxPeekSize, defaultFileThreshold)
46+
}
47+
48+
func NewChunkWithSize(size, maxPeekSize, smallFileThreshold int) *Chunk {
49+
return &Chunk{
50+
BufPool: &sync.Pool{
51+
New: func() interface{} {
52+
b := make([]byte, size)
53+
return &b
54+
},
55+
},
56+
PeekBufPool: &sync.Pool{
57+
New: func() interface{} {
58+
// pre-allocate enough capacity for initial chunk + peek
59+
return bytes.NewBuffer(make([]byte, 0, size+maxPeekSize))
60+
},
61+
},
62+
Size: size,
63+
MaxPeekSize: maxPeekSize,
64+
SmallFileThreshold: int64(smallFileThreshold),
65+
}
66+
}
67+
68+
func (c *Chunk) GetBuf() (*[]byte, bool) {
69+
buf, ok := c.BufPool.Get().(*[]byte)
70+
return buf, ok
71+
}
72+
73+
func (c *Chunk) PutBuf(buf *[]byte) {
74+
c.BufPool.Put(buf)
75+
}
76+
77+
func (c *Chunk) GetPeekBuf(buf []byte) (*bytes.Buffer, bool) {
78+
peekBuf, ok := c.PeekBufPool.Get().(*bytes.Buffer)
79+
peekBuf.Reset()
80+
peekBuf.Write(buf) // seed with buf
81+
return peekBuf, ok
82+
}
83+
84+
func (c *Chunk) PutPeekBuf(buf *bytes.Buffer) {
85+
c.PeekBufPool.Put(buf)
86+
}
87+
88+
func (c *Chunk) GetSize() int {
89+
return c.Size
90+
}
91+
92+
func (c *Chunk) GetMaxPeekSize() int {
93+
return c.MaxPeekSize
94+
}
95+
96+
func (c *Chunk) GetFileThreshold() int64 {
97+
return c.SmallFileThreshold
98+
}
99+
100+
// ReadChunk reads the next chunk of data from file
101+
func (c *Chunk) ReadChunk(reader *bufio.Reader, totalLines int) (string, error) {
102+
chunk, ok := c.GetBuf()
103+
if !ok {
104+
return "", fmt.Errorf("expected *[]byte, got %T", chunk)
105+
}
106+
defer c.PutBuf(chunk)
107+
108+
n, err := reader.Read(*chunk)
109+
var chunkStr string
110+
// "Callers should always process the n > 0 bytes returned before considering the error err."
111+
// https://pkg.go.dev/io#Reader
112+
if n > 0 {
113+
// only check the filetype at the start of file
114+
if totalLines == 0 && ShouldSkipFile((*chunk)[:n]) {
115+
return "", fmt.Errorf("skipping file: %w", ErrUnsupportedFileType)
116+
}
117+
118+
chunkStr, err = c.processChunk(reader, (*chunk)[:n])
119+
if err != nil {
120+
return "", err
121+
}
122+
}
123+
if err != nil {
124+
return "", err
125+
}
126+
return chunkStr, nil
127+
}
128+
129+
// processChunk processes the chunk, reading until a safe boundary
130+
func (c *Chunk) processChunk(reader *bufio.Reader, chunk []byte) (string, error) {
131+
peekBuf, ok := c.GetPeekBuf(chunk)
132+
if !ok {
133+
return "", fmt.Errorf("expected *bytes.Buffer, got %T", peekBuf)
134+
}
135+
defer c.PutPeekBuf(peekBuf)
136+
137+
if readErr := c.readUntilSafeBoundary(reader, len(chunk), peekBuf); readErr != nil {
138+
return "", fmt.Errorf("failed to read until safe boundary for file: %w", readErr)
139+
}
140+
141+
return peekBuf.String(), nil
142+
}
143+
144+
// readUntilSafeBoundary (hopefully) avoids splitting (https://github.com/gitleaks/gitleaks/issues/1651)
145+
func (c *Chunk) readUntilSafeBoundary(r *bufio.Reader, n int, peekBuf *bytes.Buffer) error {
146+
if peekBuf.Len() == 0 {
147+
return nil
148+
}
149+
150+
// keep reading until see our “\n…\n” boundary or hit limits
151+
for peekBuf.Len()-n < c.MaxPeekSize {
152+
if endsWithTwoNewlines(peekBuf.Bytes()) {
153+
return nil
154+
}
155+
156+
b, err := r.ReadByte()
157+
if err != nil {
158+
if err == io.EOF {
159+
return nil
160+
}
161+
return fmt.Errorf("failed to read byte: %w", err)
162+
}
163+
peekBuf.WriteByte(b)
164+
}
165+
166+
return nil
167+
}
168+
169+
// endsWithTwoNewlines returns true if b ends in at least two '\n's (ignoring any number of ' ', '\r', or '\t' between them)
170+
func endsWithTwoNewlines(b []byte) bool {
171+
count := 0
172+
for i := len(b) - 1; i >= 0; i-- {
173+
if b[i] == '\n' {
174+
count++
175+
if count >= 2 {
176+
return true
177+
}
178+
} else if unicode.IsSpace(rune(b[i])) {
179+
// the presence of other whitespace characters (`\r`, ` `, `\t`) shouldn't reset the count
180+
continue
181+
} else {
182+
return false
183+
}
184+
}
185+
return false
186+
}
187+
188+
// ShouldSkipFile checks if the file should be skipped based on its content type
189+
func ShouldSkipFile(data []byte) bool {
190+
// TODO: could other optimizations be introduced here?
191+
mimetype, err := filetype.Match(data)
192+
if err != nil {
193+
return true // could not determine file type
194+
}
195+
return mimetype.MIME.Type == "application" // skip binary files
196+
}

0 commit comments

Comments
 (0)