Skip to content

Commit 1a87ad3

Browse files
committed
feat(query-backend): hedge GetRange calls to reduce tail latency
Symbol table fetches (locations, mappings, functions, strings, stacktraces) call GetRange against object storage and are a primary source of tail latency on the read path. This adds opt-in speculative hedging: after a configurable delay, a second parallel GetRange is issued; whichever response arrives first is used and the other is cancelled. The hedge wraps only the GetRange call, not the decode — the winning response body is decoded once. A new Cleanup field on retry.Hedged ensures the losing response body is always closed, preventing connection leaks when both calls succeed. Config flag (default 0 = disabled): --query-backend.block-read-hedge-after=<duration>
1 parent d8ee7ad commit 1a87ad3

8 files changed

Lines changed: 314 additions & 34 deletions

File tree

pkg/block/object.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path/filepath"
88
"strconv"
99
"strings"
10+
"time"
1011

1112
"github.com/grafana/dskit/multierror"
1213
"github.com/oklog/ulid/v2"
@@ -39,6 +40,7 @@ type Object struct {
3940

4041
memSize int
4142
downloadDir string
43+
hedgeAfter time.Duration
4244
}
4345

4446
type ObjectOption func(*Object)
@@ -61,6 +63,12 @@ func WithObjectDownload(dir string) ObjectOption {
6163
}
6264
}
6365

66+
func WithObjectHedgeAfter(d time.Duration) ObjectOption {
67+
return func(obj *Object) {
68+
obj.hedgeAfter = d
69+
}
70+
}
71+
6472
func NewObjectFromPath(ctx context.Context, storage objstore.Bucket, path string, opts ...ObjectOption) (*Object, error) {
6573
attrs, err := storage.Attributes(ctx, path)
6674
if err != nil {

pkg/block/section_symbols.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ func openSymbols(ctx context.Context, s *Dataset) (err error) {
1414
offset -= int64(s.offset())
1515
s.symbols, err = symdb.OpenObject(ctx, s.inMemoryBucket(buf), s.obj.path, offset, size)
1616
} else {
17-
s.symbols, err = symdb.OpenObject(ctx, s.obj.storage, s.obj.path, offset, size,
18-
symdb.WithPrefetchSize(symbolsPrefetchSize))
17+
opts := []symdb.Option{symdb.WithPrefetchSize(symbolsPrefetchSize)}
18+
if s.obj.hedgeAfter > 0 {
19+
opts = append(opts, symdb.WithHedgeAfter(s.obj.hedgeAfter))
20+
}
21+
s.symbols, err = symdb.OpenObject(ctx, s.obj.storage, s.obj.path, offset, size, opts...)
1922
}
2023
if err != nil {
2124
return fmt.Errorf("opening symbols: %w", err)

pkg/phlaredb/symdb/block_reader.go

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"os"
1111
"path/filepath"
1212
"sync"
13+
"time"
1314

1415
"github.com/grafana/dskit/multierror"
1516
"github.com/grafana/dskit/tracing"
@@ -21,6 +22,7 @@ import (
2122
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
2223
"github.com/grafana/pyroscope/pkg/util/bufferpool"
2324
"github.com/grafana/pyroscope/pkg/util/refctr"
25+
"github.com/grafana/pyroscope/pkg/util/retry"
2426
)
2527

2628
type Reader struct {
@@ -38,6 +40,7 @@ type Reader struct {
3840
parquetFiles *parquetFiles
3941

4042
prefetchSize uint64
43+
hedgeAfter time.Duration
4144
}
4245

4346
type Option func(*Reader)
@@ -48,6 +51,12 @@ func WithPrefetchSize(size uint64) Option {
4851
}
4952
}
5053

54+
func WithHedgeAfter(d time.Duration) Option {
55+
return func(r *Reader) {
56+
r.hedgeAfter = d
57+
}
58+
}
59+
5160
func OpenObject(ctx context.Context, b objstore.BucketReader, name string, offset, size int64, options ...Option) (*Reader, error) {
5261
f := block.File{
5362
RelPath: name,
@@ -560,7 +569,7 @@ func (c *stacktraceBlock) fetch(ctx context.Context) error {
560569
if err != nil {
561570
return err
562571
}
563-
rc, err := c.reader.bucket.GetRange(ctx, path, c.header.Offset, c.header.Size)
572+
rc, err := c.getRange(ctx, path)
564573
if err != nil {
565574
return err
566575
}
@@ -573,15 +582,19 @@ func (c *stacktraceBlock) fetch(ctx context.Context) error {
573582
})
574583
}
575584

576-
func (c *stacktraceBlock) stacktracesFile() (string, error) {
577-
f := c.reader.file
578-
if c.reader.index.Header.Version < 3 {
579-
var err error
580-
if f, err = c.reader.lookupFile(StacktracesFileName); err != nil {
581-
return "", err
582-
}
585+
// getRange issues a GetRange call, hedging with a second call if hedgeAfter is set.
586+
func (c *stacktraceBlock) getRange(ctx context.Context, path string) (io.ReadCloser, error) {
587+
if c.reader.hedgeAfter <= 0 {
588+
return c.reader.bucket.GetRange(ctx, path, c.header.Offset, c.header.Size)
583589
}
584-
return f.RelPath, nil
590+
return retry.Hedged[io.ReadCloser]{
591+
Trigger: time.After(c.reader.hedgeAfter),
592+
FailFast: true,
593+
Cleanup: func(rc io.ReadCloser) { rc.Close() },
594+
Call: func(ctx context.Context, _ bool) (io.ReadCloser, error) {
595+
return c.reader.bucket.GetRange(ctx, path, c.header.Offset, c.header.Size)
596+
},
597+
}.Do(ctx)
585598
}
586599

587600
func (c *stacktraceBlock) readFrom(r *bufio.Reader) error {
@@ -607,6 +620,17 @@ func (c *stacktraceBlock) readFrom(r *bufio.Reader) error {
607620
return nil
608621
}
609622

623+
func (c *stacktraceBlock) stacktracesFile() (string, error) {
624+
f := c.reader.file
625+
if c.reader.index.Header.Version < 3 {
626+
var err error
627+
if f, err = c.reader.lookupFile(StacktracesFileName); err != nil {
628+
return "", err
629+
}
630+
}
631+
return f.RelPath, nil
632+
}
633+
610634
func (c *stacktraceBlock) release() {
611635
c.r.Dec(func() {
612636
c.t = nil
@@ -627,10 +651,7 @@ func (t *rawTable[T]) fetch(ctx context.Context) error {
627651
span.SetTag("length", t.header.Length)
628652
defer span.Finish()
629653
return t.r.Inc(func() error {
630-
rc, err := t.reader.bucket.GetRange(ctx,
631-
t.reader.file.RelPath,
632-
int64(t.header.Offset),
633-
int64(t.header.Size))
654+
rc, err := t.getRange(ctx)
634655
if err != nil {
635656
return err
636657
}
@@ -643,6 +664,27 @@ func (t *rawTable[T]) fetch(ctx context.Context) error {
643664
})
644665
}
645666

667+
// getRange issues a GetRange call, hedging with a second call if hedgeAfter is set.
668+
func (t *rawTable[T]) getRange(ctx context.Context) (io.ReadCloser, error) {
669+
if t.reader.hedgeAfter <= 0 {
670+
return t.reader.bucket.GetRange(ctx,
671+
t.reader.file.RelPath,
672+
int64(t.header.Offset),
673+
int64(t.header.Size))
674+
}
675+
return retry.Hedged[io.ReadCloser]{
676+
Trigger: time.After(t.reader.hedgeAfter),
677+
FailFast: true,
678+
Cleanup: func(rc io.ReadCloser) { rc.Close() },
679+
Call: func(ctx context.Context, _ bool) (io.ReadCloser, error) {
680+
return t.reader.bucket.GetRange(ctx,
681+
t.reader.file.RelPath,
682+
int64(t.header.Offset),
683+
int64(t.header.Size))
684+
},
685+
}.Do(ctx)
686+
}
687+
646688
func (t *rawTable[T]) readFrom(r *bufio.Reader) error {
647689
crc := crc32.New(castagnoli)
648690
tee := io.TeeReader(r, crc)

pkg/pyroscope/modules_experimental.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,8 @@ func (f *Pyroscope) initQueryBackend() (services.Service, error) {
370370
logger,
371371
f.reg,
372372
f.queryBackendClient,
373-
querybackend.NewBlockReader(f.logger, f.storageBucket, f.reg),
373+
querybackend.NewBlockReader(f.logger, f.storageBucket, f.reg,
374+
querybackend.WithBlockReaderHedgeAfter(f.Cfg.QueryBackend.BlockReadHedgeAfter)),
374375
)
375376
if err != nil {
376377
return nil, err

pkg/querybackend/backend.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ import (
2121
)
2222

2323
type Config struct {
24-
Address string `yaml:"address"`
25-
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the query-frontends and the query-schedulers."`
26-
ClientTimeout time.Duration `yaml:"client_timeout"`
24+
Address string `yaml:"address"`
25+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the query-frontends and the query-schedulers."`
26+
ClientTimeout time.Duration `yaml:"client_timeout"`
27+
BlockReadHedgeAfter time.Duration `yaml:"block_read_hedge_after" category:"advanced"`
2728
}
2829

2930
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
3031
f.StringVar(&cfg.Address, "query-backend.address", "localhost:9095", "")
3132
f.DurationVar(&cfg.ClientTimeout, "query-backend.client-timeout", 30*time.Second, "Timeout for query-backend client requests.")
33+
f.DurationVar(&cfg.BlockReadHedgeAfter, "query-backend.block-read-hedge-after", 0, "If non-zero, issue a speculative second GetRange request for symbol tables after this duration. 0 disables hedging.")
3234
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-backend.grpc-client-config", f)
3335
}
3436

pkg/querybackend/block_reader.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ type BlockReader struct {
5050
log log.Logger
5151
storage objstore.Bucket
5252

53-
metrics *metrics
54-
hostname string
53+
metrics *metrics
54+
hostname string
55+
hedgeAfter time.Duration
5556

5657
// TODO:
5758
// - Use a worker pool instead of the errgroup.
@@ -61,14 +62,26 @@ type BlockReader struct {
6162
// Instead, they should share the processing pipeline, if possible.
6263
}
6364

64-
func NewBlockReader(logger log.Logger, storage objstore.Bucket, reg prometheus.Registerer) *BlockReader {
65+
func NewBlockReader(logger log.Logger, storage objstore.Bucket, reg prometheus.Registerer, opts ...BlockReaderOption) *BlockReader {
6566
hostname, _ := os.Hostname()
66-
return &BlockReader{
67+
br := &BlockReader{
6768
log: logger,
6869
storage: storage,
6970
metrics: newMetrics(reg),
7071
hostname: hostname,
7172
}
73+
for _, opt := range opts {
74+
opt(br)
75+
}
76+
return br
77+
}
78+
79+
type BlockReaderOption func(*BlockReader)
80+
81+
func WithBlockReaderHedgeAfter(d time.Duration) BlockReaderOption {
82+
return func(br *BlockReader) {
83+
br.hedgeAfter = d
84+
}
7285
}
7386

7487
func (b *BlockReader) Invoke(
@@ -115,7 +128,7 @@ func (b *BlockReader) Invoke(
115128
}
116129
blocksCount++
117130
datasetsCount += int64(len(md.Datasets))
118-
obj := block.NewObject(b.storage, md)
131+
obj := block.NewObject(b.storage, md, block.WithObjectHedgeAfter(b.hedgeAfter))
119132
g.Go(util.RecoverPanic((&blockContext{
120133
ctx: ctx,
121134
log: b.log,

pkg/util/retry/hedged.go

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,36 @@ type Hedged[T any] struct {
2626
// - the result received first is returned, regardless of anything.
2727
// - if Call fails before the trigger fires, it won't be retried.
2828
FailFast bool
29+
30+
// Cleanup is called on the result of a losing attempt when it succeeded
31+
// but another attempt already won. Use this to release resources (e.g.,
32+
// close an io.ReadCloser) that would otherwise be abandoned.
33+
Cleanup func(T)
2934
}
3035

3136
type Call[T any] func(ctx context.Context, isRetry bool) (T, error)
3237

3338
func (s Hedged[T]) Do(ctx context.Context) (T, error) {
34-
attemptCtx, cancel := context.WithCancel(ctx)
35-
defer cancel()
39+
// Each attempt gets its own independent cancellable context derived from
40+
// the parent. The winner cancels only the loser — never itself — so that
41+
// values tied to the winning context (e.g. an io.ReadCloser backed by an
42+
// HTTP connection) remain usable after Do returns. The winner's context
43+
// lives until the parent ctx is cancelled.
44+
ctx1, cancel1 := context.WithCancel(ctx)
45+
ctx2, cancel2 := context.WithCancel(ctx)
46+
3647
var (
3748
ret T
3849
err error
3950
failed uint64
4051

4152
wg sync.WaitGroup
4253
do sync.Once
54+
// decided is closed by the goroutine that stores the winning result.
55+
decided = make(chan struct{})
4356
)
4457

45-
attempt := func(isRetry bool) {
58+
attempt := func(attemptCtx context.Context, cancelSelf, cancelOther context.CancelFunc, isRetry bool) {
4659
wg.Add(1)
4760
go func() {
4861
defer wg.Done()
@@ -55,21 +68,43 @@ func (s Hedged[T]) Do(ctx context.Context) (T, error) {
5568
// ongoing attempt.
5669
return
5770
}
58-
// If there is an ongoing attempt, it will be cancelled,
59-
// because we already got the result.
60-
cancel()
71+
stored := false
6172
do.Do(func() {
6273
ret, err = attemptRet, attemptErr
74+
stored = true
75+
close(decided)
6376
})
77+
if stored {
78+
// We won: cancel the other attempt.
79+
// Do NOT cancel our own context — the caller may still be
80+
// reading from our result (e.g. io.ReadCloser).
81+
cancelOther()
82+
} else {
83+
// We lost: cancel our own context and release our result.
84+
cancelSelf()
85+
if attemptErr == nil && s.Cleanup != nil {
86+
s.Cleanup(attemptRet)
87+
}
88+
}
6489
}()
6590
}
6691

67-
attempt(false)
92+
attempt(ctx1, cancel1, cancel2, false)
6893
select {
69-
case <-attemptCtx.Done():
70-
// Call has returned, or caller cancelled the request.
94+
case <-decided:
95+
// A winner was found before the trigger fired.
96+
case <-ctx.Done():
97+
// Caller cancelled: abort both attempts.
98+
cancel1()
99+
cancel2()
71100
case <-s.Trigger:
72-
attempt(true)
101+
attempt(ctx2, cancel2, cancel1, true)
102+
select {
103+
case <-decided:
104+
case <-ctx.Done():
105+
cancel1()
106+
cancel2()
107+
}
73108
}
74109

75110
wg.Wait()

0 commit comments

Comments
 (0)