Skip to content

Commit ebc692e

Browse files
M09Icclaude
andcommitted
fix: use ctx as unified shutdown signal to prevent channel close panics
- Replace additionClosed atomic with ctx-based guard in addAddition - Add handlerDone chan to coordinate Handler goroutine lifecycle - Add sendProcess method with select+ctx.Done protection - Protect putToOutput/putToFuzzy/checkCh sends with select+ctx.Done - Rewrite BrutePool.Close/CheckPool.Close with correct shutdown order: Cancel → Release pools → close(processCh) → <-handlerDone - Fix doCrawl orphan wg.Add(1) with defer wg.Done() - Fix analyzeDone inverted condition (replaced with handlerDone chan) - Add ctx.Done check to Run monitor goroutine Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent dd6251f commit ebc692e

3 files changed

Lines changed: 60 additions & 44 deletions

File tree

core/pool/brutepool.go

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@ func NewBrutePool(ctx context.Context, config *Config) (*BrutePool, error) {
5252
Timeout: config.Timeout,
5353
ProxyClient: config.ProxyClient,
5454
}),
55-
additionCh: make(chan *Unit, config.Thread*10),
56-
closeCh: make(chan struct{}),
57-
processCh: make(chan *baseline.Baseline, config.Thread*2),
58-
wg: &sync.WaitGroup{},
55+
additionCh: make(chan *Unit, config.Thread*10),
56+
closeCh: make(chan struct{}),
57+
processCh: make(chan *baseline.Baseline, config.Thread*2),
58+
wg: &sync.WaitGroup{},
59+
handlerDone: make(chan struct{}),
5960
},
6061
base: u.Scheme + "://" + u.Host,
6162
isDir: strings.HasSuffix(u.Path, "/"),
@@ -105,7 +106,6 @@ type BrutePool struct {
105106
urls sync.Map
106107
scopeurls map[string]struct{}
107108
uniques map[uint16]struct{}
108-
analyzeDone bool
109109
limiter *rate.Limiter
110110
locker sync.Mutex
111111
scopeLocker sync.Mutex
@@ -197,6 +197,11 @@ func (pool *BrutePool) Run(offset, limit int) {
197197
close(pool.closeCh)
198198
return
199199
}
200+
select {
201+
case <-pool.ctx.Done():
202+
return
203+
default:
204+
}
200205
time.Sleep(100 * time.Millisecond)
201206
}
202207
}()
@@ -376,7 +381,7 @@ func (pool *BrutePool) Invoke(v interface{}) {
376381

377382
case parsers.WordSource:
378383
// 异步进行性能消耗较大的深度对比
379-
pool.processCh <- bl
384+
pool.sendProcess(bl)
380385
if int(pool.Statistor.ReqTotal)%pool.CheckPeriod == 0 {
381386
// 间歇插入check waf的探针
382387
pool.doCheck()
@@ -388,9 +393,9 @@ func (pool *BrutePool) Invoke(v interface{}) {
388393
pool.Bar.Done()
389394
case parsers.RedirectSource:
390395
bl.FrontURL = unit.frontUrl
391-
pool.processCh <- bl
396+
pool.sendProcess(bl)
392397
default:
393-
pool.processCh <- bl
398+
pool.sendProcess(bl)
394399
}
395400
}
396401

@@ -432,6 +437,7 @@ func (pool *BrutePool) NoScopeInvoke(v interface{}) {
432437
}
433438

434439
func (pool *BrutePool) Handler() {
440+
defer close(pool.handlerDone)
435441
for bl := range pool.processCh {
436442
if bl.IsValid {
437443
pool.addFuzzyBaseline(bl)
@@ -516,8 +522,6 @@ func (pool *BrutePool) Handler() {
516522
}
517523
pool.wg.Done()
518524
}
519-
520-
pool.analyzeDone = true
521525
}
522526

523527
func (pool *BrutePool) checkRedirect(redirectURL string) bool {
@@ -674,16 +678,12 @@ func (pool *BrutePool) fallback() {
674678
}
675679

676680
func (pool *BrutePool) Close() {
677-
for pool.analyzeDone {
678-
// 等待缓存的待处理任务完成
679-
time.Sleep(time.Duration(100) * time.Millisecond)
680-
}
681-
pool.additionClosed.Store(true)
682-
// additionCh 可能仍有异步 producer(redirect/crawl/retry/append),
683-
// 依赖 closeCh/ctx 停止消费循环,不直接关闭 channel
684-
pool.Statistor.EndTime = time.Now().Unix()
681+
pool.Cancel()
685682
pool.reqPool.Release()
686683
pool.scopePool.Release()
684+
close(pool.processCh)
685+
<-pool.handlerDone
686+
pool.Statistor.EndTime = time.Now().Unix()
687687
}
688688

689689
func (pool *BrutePool) safePath(u string) string {
@@ -714,9 +714,15 @@ func (pool *BrutePool) doCheck() {
714714
}
715715

716716
if pool.Mod == HostSpray {
717-
pool.checkCh <- struct{}{}
717+
select {
718+
case pool.checkCh <- struct{}{}:
719+
case <-pool.ctx.Done():
720+
}
718721
} else if pool.Mod == PathSpray {
719-
pool.checkCh <- struct{}{}
722+
select {
723+
case pool.checkCh <- struct{}{}:
724+
case <-pool.ctx.Done():
725+
}
720726
}
721727
}
722728

@@ -756,6 +762,7 @@ func (pool *BrutePool) doCrawl(bl *baseline.Baseline) {
756762
pool.doScopeCrawl(bl)
757763

758764
go func() {
765+
defer pool.wg.Done()
759766
for _, u := range bl.URLs {
760767
if u = pkg.FormatURL(bl.Url.Path, u); u == "" {
761768
continue

core/pool/checkpool.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func NewCheckPool(ctx context.Context, config *Config) (*CheckPool, error) {
3434
additionCh: make(chan *Unit, config.Thread*10),
3535
closeCh: make(chan struct{}),
3636
processCh: make(chan *baseline.Baseline, config.Thread*2),
37+
handlerDone: make(chan struct{}),
3738
},
3839
}
3940
pool.Request.Headers.Set("Connection", "close")
@@ -105,6 +106,9 @@ Loop:
105106
pool.Close()
106107
}
107108
func (pool *CheckPool) Close() {
109+
pool.Cancel()
110+
close(pool.processCh)
111+
<-pool.handlerDone
108112
pool.Bar.Close()
109113
pool.Pool.Release()
110114
}
@@ -139,7 +143,7 @@ func (pool *CheckPool) Invoke(v interface{}) {
139143
ReqDepth: unit.depth,
140144
},
141145
}
142-
pool.processCh <- bl
146+
pool.sendProcess(bl)
143147
return
144148
}
145149
start := time.Now()
@@ -172,10 +176,11 @@ func (pool *CheckPool) Invoke(v interface{}) {
172176
if bl.RedirectURL != "" {
173177
pool.doRedirect(bl, bl.ReqDepth)
174178
}
175-
pool.processCh <- bl
179+
pool.sendProcess(bl)
176180
}
177181

178182
func (pool *CheckPool) Handler() {
183+
defer close(pool.handlerDone)
179184
for bl := range pool.processCh {
180185
if bl.IsValid {
181186
params := map[string]interface{}{

core/pool/pool.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ type BasePool struct {
2121
ctx context.Context
2222
processCh chan *baseline.Baseline // 待处理的baseline
2323

24-
reqCount int
25-
failedCount int
26-
additionCh chan *Unit
27-
additionClosed atomic.Bool
28-
closeCh chan struct{}
29-
wg *sync.WaitGroup
30-
isFallback atomic.Bool
24+
reqCount int
25+
failedCount int
26+
additionCh chan *Unit
27+
closeCh chan struct{}
28+
wg *sync.WaitGroup
29+
handlerDone chan struct{}
30+
isFallback atomic.Bool
3131
}
3232

3333
func (pool *BasePool) doRetry(bl *baseline.Baseline) {
@@ -45,25 +45,21 @@ func (pool *BasePool) doRetry(bl *baseline.Baseline) {
4545
}
4646

4747
func (pool *BasePool) addAddition(u *Unit) {
48-
if pool.ctx.Err() != nil || pool.additionClosed.Load() {
48+
if pool.ctx.Err() != nil {
4949
return
5050
}
51-
5251
pool.wg.Add(1)
5352
select {
53+
case pool.additionCh <- u:
5454
case <-pool.ctx.Done():
5555
pool.wg.Done()
56-
return
57-
case pool.additionCh <- u:
58-
return
59-
default:
60-
go func() {
61-
select {
62-
case pool.additionCh <- u:
63-
case <-pool.ctx.Done():
64-
pool.wg.Done()
65-
}
66-
}()
56+
}
57+
}
58+
59+
func (pool *BasePool) sendProcess(bl *baseline.Baseline) {
60+
select {
61+
case pool.processCh <- bl:
62+
case <-pool.ctx.Done():
6763
}
6864
}
6965

@@ -72,11 +68,19 @@ func (pool *BasePool) putToOutput(bl *baseline.Baseline) {
7268
bl.Collect()
7369
}
7470
pool.Outwg.Add(1)
75-
pool.OutputCh <- bl
71+
select {
72+
case pool.OutputCh <- bl:
73+
case <-pool.ctx.Done():
74+
pool.Outwg.Done()
75+
}
7676
}
7777

7878
func (pool *BasePool) putToFuzzy(bl *baseline.Baseline) {
7979
pool.Outwg.Add(1)
8080
bl.IsFuzzy = true
81-
pool.FuzzyCh <- bl
81+
select {
82+
case pool.FuzzyCh <- bl:
83+
case <-pool.ctx.Done():
84+
pool.Outwg.Done()
85+
}
8286
}

0 commit comments

Comments
 (0)