Skip to content

Commit 6c73af5

Browse files
authored
scrape: timeout WriteRaw, drop sp.mtx before waiting on loops (#6361)
The per-target scrape loop bounded only the HTTP scrape with scrape_timeout; the subsequent store.WriteRaw call ran under the bare scrapeCtx with no deadline. When write latency spiked, each iteration blocked indefinitely inside WriteRaw and the target appeared to stop scraping (time.Ticker doesn't accumulate, so only one make-up scrape fires after the call returns). WriteRaw now runs under context.WithTimeout(scrapeCtx, scrape_timeout) so a slow store fails the scrape and the next interval fires on time. scrapePool.stop() and reload() also held sp.mtx across wg.Wait() for old loops to terminate. A loop wedged inside WriteRaw would therefore block sp.mtx, and — via Manager.ApplyConfig / Stop holding m.mtxScrape — block the manager's Run loop from draining new target sets, freezing every pool. Both methods now snapshot the loops under sp.mtx, release the lock, and wait on goroutines outside it, matching the pattern already used by sync().
1 parent 470ff04 commit 6c73af5

1 file changed

Lines changed: 34 additions & 16 deletions

File tree

pkg/scrape/scrape.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -140,21 +140,26 @@ func (sp *scrapePool) DroppedTargets() []*Target {
140140
// stop terminates all scrape loops and returns after they all terminated.
141141
func (sp *scrapePool) stop() {
142142
sp.cancel()
143-
var wg sync.WaitGroup
144143

144+
// Snapshot loops under the lock, then release before waiting — same
145+
// reasoning as sync(): a hung scrape loop must not be able to wedge
146+
// other sp.mtx readers (and, transitively, the manager's mtxScrape).
145147
sp.mtx.Lock()
146-
defer sp.mtx.Unlock()
147-
148+
toStop := make([]loop, 0, len(sp.loops))
148149
for fp, l := range sp.loops {
149-
wg.Add(1)
150+
toStop = append(toStop, l)
151+
delete(sp.loops, fp)
152+
delete(sp.activeTargets, fp)
153+
}
154+
sp.mtx.Unlock()
150155

156+
var wg sync.WaitGroup
157+
for _, l := range toStop {
158+
wg.Add(1)
151159
go func(l loop) {
152160
l.stop()
153161
wg.Done()
154162
}(l)
155-
156-
delete(sp.loops, fp)
157-
delete(sp.activeTargets, fp)
158163
}
159164
wg.Wait()
160165
}
@@ -166,7 +171,6 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
166171
start := time.Now()
167172

168173
sp.mtx.Lock()
169-
defer sp.mtx.Unlock()
170174

171175
client, err := commonconfig.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
172176
if err != nil {
@@ -177,27 +181,36 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
177181
sp.client = client
178182

179183
var (
180-
wg sync.WaitGroup
181184
interval = time.Duration(sp.config.ScrapeInterval)
182185
timeout = time.Duration(sp.config.ScrapeTimeout)
183186
)
184187

188+
// Swap old loops for new under the lock, but don't wait for the old
189+
// loops to stop while still holding sp.mtx — same reasoning as sync():
190+
// a slow oldLoop.stop() must not wedge other sp.mtx readers (and,
191+
// transitively, the manager's mtxScrape).
192+
type loopPair struct{ oldLoop, newLoop loop }
193+
swaps := make([]loopPair, 0, len(sp.loops))
185194
for fp, oldLoop := range sp.loops {
186195
var (
187196
t = sp.activeTargets[fp]
188197
s = &targetScraper{Target: t, logger: sp.logger, client: sp.client, timeout: timeout}
189198
newLoop = sp.newLoop(t, s)
190199
)
191-
wg.Add(1)
200+
swaps = append(swaps, loopPair{oldLoop: oldLoop, newLoop: newLoop})
201+
sp.loops[fp] = newLoop
202+
}
203+
sp.mtx.Unlock()
192204

205+
var wg sync.WaitGroup
206+
for _, p := range swaps {
207+
wg.Add(1)
193208
go func(oldLoop, newLoop loop) {
194209
oldLoop.stop()
195210
wg.Done()
196211

197212
go newLoop.run(interval, timeout, nil)
198-
}(oldLoop, newLoop)
199-
200-
sp.loops[fp] = newLoop
213+
}(p.oldLoop, p.newLoop)
201214
}
202215

203216
wg.Wait()
@@ -470,7 +483,12 @@ mainLoop:
470483
cancel()
471484

472485
if scrapeErr == nil {
473-
err := processScrapeResp(buf, sl, profileType)
486+
// Bound the write the same way the scrape itself is bounded.
487+
// Without this, a slow store.WriteRaw stalls the per-target
488+
// loop indefinitely and the target appears to stop scraping.
489+
writeCtx, writeCancel := context.WithTimeout(sl.scrapeCtx, timeout)
490+
err := processScrapeResp(writeCtx, buf, sl, profileType)
491+
writeCancel()
474492
if err != nil {
475493
if errc != nil {
476494
errc <- err
@@ -509,7 +527,7 @@ mainLoop:
509527
close(sl.stopped)
510528
}
511529

512-
func processScrapeResp(buf *bytes.Buffer, sl *scrapeLoop, profileType string) error {
530+
func processScrapeResp(ctx context.Context, buf *bytes.Buffer, sl *scrapeLoop, profileType string) error {
513531
b := buf.Bytes()
514532
defer sl.buffers.Put(b)
515533
// NOTE: There were issues with misbehaving clients in the past
@@ -586,7 +604,7 @@ func processScrapeResp(buf *bytes.Buffer, sl *scrapeLoop, profileType string) er
586604
byt = newBuf.Bytes()
587605
}
588606

589-
_, err = sl.store.WriteRaw(sl.scrapeCtx, &profilepb.WriteRawRequest{
607+
_, err = sl.store.WriteRaw(ctx, &profilepb.WriteRawRequest{
590608
Normalized: sl.normalizedAddresses,
591609
Series: []*profilepb.RawProfileSeries{
592610
{

0 commit comments

Comments
 (0)