Skip to content

Commit a432037

Browse files
committed
feat(query-backend): hedge GetRange calls to reduce tail latency
Symbol table fetches (locations, mappings, functions, strings, stacktraces) call GetRange against object storage and are a primary source of tail latency on the read path. This adds opt-in speculative hedging: after a configurable delay, a second parallel GetRange is issued; whichever response arrives first is used and the other is cancelled. The hedge wraps only the GetRange call, not the decode — the winning response body is decoded once. A new Cleanup field on retry.Hedged ensures the losing response body is always closed, preventing connection leaks when both calls succeed. Config flag (default 0 = disabled): --query-backend.block-read-hedge-after=<duration>
1 parent 855444b commit a432037

8 files changed

Lines changed: 206 additions & 24 deletions

File tree

pkg/block/object.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path/filepath"
88
"strconv"
99
"strings"
10+
"time"
1011

1112
"github.com/grafana/dskit/multierror"
1213
"github.com/oklog/ulid/v2"
@@ -39,6 +40,7 @@ type Object struct {
3940

4041
memSize int
4142
downloadDir string
43+
hedgeAfter time.Duration
4244
}
4345

4446
type ObjectOption func(*Object)
@@ -61,6 +63,12 @@ func WithObjectDownload(dir string) ObjectOption {
6163
}
6264
}
6365

66+
func WithObjectHedgeAfter(d time.Duration) ObjectOption {
67+
return func(obj *Object) {
68+
obj.hedgeAfter = d
69+
}
70+
}
71+
6472
func NewObjectFromPath(ctx context.Context, storage objstore.Bucket, path string, opts ...ObjectOption) (*Object, error) {
6573
attrs, err := storage.Attributes(ctx, path)
6674
if err != nil {

pkg/block/section_symbols.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ func openSymbols(ctx context.Context, s *Dataset) (err error) {
1414
offset -= int64(s.offset())
1515
s.symbols, err = symdb.OpenObject(ctx, s.inMemoryBucket(buf), s.obj.path, offset, size)
1616
} else {
17-
s.symbols, err = symdb.OpenObject(ctx, s.obj.storage, s.obj.path, offset, size,
18-
symdb.WithPrefetchSize(symbolsPrefetchSize))
17+
opts := []symdb.Option{symdb.WithPrefetchSize(symbolsPrefetchSize)}
18+
if s.obj.hedgeAfter > 0 {
19+
opts = append(opts, symdb.WithHedgeAfter(s.obj.hedgeAfter))
20+
}
21+
s.symbols, err = symdb.OpenObject(ctx, s.obj.storage, s.obj.path, offset, size, opts...)
1922
}
2023
if err != nil {
2124
return fmt.Errorf("opening symbols: %w", err)

pkg/phlaredb/symdb/block_reader.go

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"os"
1111
"path/filepath"
1212
"sync"
13+
"time"
1314

1415
"github.com/grafana/dskit/multierror"
1516
"github.com/grafana/dskit/tracing"
@@ -21,6 +22,7 @@ import (
2122
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
2223
"github.com/grafana/pyroscope/pkg/util/bufferpool"
2324
"github.com/grafana/pyroscope/pkg/util/refctr"
25+
"github.com/grafana/pyroscope/pkg/util/retry"
2426
)
2527

2628
type Reader struct {
@@ -38,6 +40,7 @@ type Reader struct {
3840
parquetFiles *parquetFiles
3941

4042
prefetchSize uint64
43+
hedgeAfter time.Duration
4144
}
4245

4346
type Option func(*Reader)
@@ -48,6 +51,12 @@ func WithPrefetchSize(size uint64) Option {
4851
}
4952
}
5053

54+
func WithHedgeAfter(d time.Duration) Option {
55+
return func(r *Reader) {
56+
r.hedgeAfter = d
57+
}
58+
}
59+
5160
func OpenObject(ctx context.Context, b objstore.BucketReader, name string, offset, size int64, options ...Option) (*Reader, error) {
5261
f := block.File{
5362
RelPath: name,
@@ -560,7 +569,7 @@ func (c *stacktraceBlock) fetch(ctx context.Context) error {
560569
if err != nil {
561570
return err
562571
}
563-
rc, err := c.reader.bucket.GetRange(ctx, path, c.header.Offset, c.header.Size)
572+
rc, err := c.getRange(ctx, path)
564573
if err != nil {
565574
return err
566575
}
@@ -573,15 +582,19 @@ func (c *stacktraceBlock) fetch(ctx context.Context) error {
573582
})
574583
}
575584

576-
func (c *stacktraceBlock) stacktracesFile() (string, error) {
577-
f := c.reader.file
578-
if c.reader.index.Header.Version < 3 {
579-
var err error
580-
if f, err = c.reader.lookupFile(StacktracesFileName); err != nil {
581-
return "", err
582-
}
585+
// getRange issues a GetRange call, hedging with a second call if hedgeAfter is set.
586+
func (c *stacktraceBlock) getRange(ctx context.Context, path string) (io.ReadCloser, error) {
587+
if c.reader.hedgeAfter <= 0 {
588+
return c.reader.bucket.GetRange(ctx, path, c.header.Offset, c.header.Size)
583589
}
584-
return f.RelPath, nil
590+
return retry.Hedged[io.ReadCloser]{
591+
Trigger: time.After(c.reader.hedgeAfter),
592+
FailFast: true,
593+
Cleanup: func(rc io.ReadCloser) { rc.Close() },
594+
Call: func(ctx context.Context, _ bool) (io.ReadCloser, error) {
595+
return c.reader.bucket.GetRange(ctx, path, c.header.Offset, c.header.Size)
596+
},
597+
}.Do(ctx)
585598
}
586599

587600
func (c *stacktraceBlock) readFrom(r *bufio.Reader) error {
@@ -607,6 +620,17 @@ func (c *stacktraceBlock) readFrom(r *bufio.Reader) error {
607620
return nil
608621
}
609622

623+
func (c *stacktraceBlock) stacktracesFile() (string, error) {
624+
f := c.reader.file
625+
if c.reader.index.Header.Version < 3 {
626+
var err error
627+
if f, err = c.reader.lookupFile(StacktracesFileName); err != nil {
628+
return "", err
629+
}
630+
}
631+
return f.RelPath, nil
632+
}
633+
610634
func (c *stacktraceBlock) release() {
611635
c.r.Dec(func() {
612636
c.t = nil
@@ -627,10 +651,7 @@ func (t *rawTable[T]) fetch(ctx context.Context) error {
627651
span.SetTag("length", t.header.Length)
628652
defer span.Finish()
629653
return t.r.Inc(func() error {
630-
rc, err := t.reader.bucket.GetRange(ctx,
631-
t.reader.file.RelPath,
632-
int64(t.header.Offset),
633-
int64(t.header.Size))
654+
rc, err := t.getRange(ctx)
634655
if err != nil {
635656
return err
636657
}
@@ -643,6 +664,27 @@ func (t *rawTable[T]) fetch(ctx context.Context) error {
643664
})
644665
}
645666

667+
// getRange issues a GetRange call, hedging with a second call if hedgeAfter is set.
668+
func (t *rawTable[T]) getRange(ctx context.Context) (io.ReadCloser, error) {
669+
if t.reader.hedgeAfter <= 0 {
670+
return t.reader.bucket.GetRange(ctx,
671+
t.reader.file.RelPath,
672+
int64(t.header.Offset),
673+
int64(t.header.Size))
674+
}
675+
return retry.Hedged[io.ReadCloser]{
676+
Trigger: time.After(t.reader.hedgeAfter),
677+
FailFast: true,
678+
Cleanup: func(rc io.ReadCloser) { rc.Close() },
679+
Call: func(ctx context.Context, _ bool) (io.ReadCloser, error) {
680+
return t.reader.bucket.GetRange(ctx,
681+
t.reader.file.RelPath,
682+
int64(t.header.Offset),
683+
int64(t.header.Size))
684+
},
685+
}.Do(ctx)
686+
}
687+
646688
func (t *rawTable[T]) readFrom(r *bufio.Reader) error {
647689
crc := crc32.New(castagnoli)
648690
tee := io.TeeReader(r, crc)

pkg/pyroscope/modules_experimental.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,8 @@ func (f *Pyroscope) initQueryBackend() (services.Service, error) {
370370
logger,
371371
f.reg,
372372
f.queryBackendClient,
373-
querybackend.NewBlockReader(f.logger, f.storageBucket, f.reg),
373+
querybackend.NewBlockReader(f.logger, f.storageBucket, f.reg,
374+
querybackend.WithBlockReaderHedgeAfter(f.Cfg.QueryBackend.BlockReadHedgeAfter)),
374375
)
375376
if err != nil {
376377
return nil, err

pkg/querybackend/backend.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ import (
2121
)
2222

2323
type Config struct {
24-
Address string `yaml:"address"`
25-
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the query-frontends and the query-schedulers."`
26-
ClientTimeout time.Duration `yaml:"client_timeout"`
24+
Address string `yaml:"address"`
25+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the query-frontends and the query-schedulers."`
26+
ClientTimeout time.Duration `yaml:"client_timeout"`
27+
BlockReadHedgeAfter time.Duration `yaml:"block_read_hedge_after" category:"advanced"`
2728
}
2829

2930
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
3031
f.StringVar(&cfg.Address, "query-backend.address", "localhost:9095", "")
3132
f.DurationVar(&cfg.ClientTimeout, "query-backend.client-timeout", 30*time.Second, "Timeout for query-backend client requests.")
33+
f.DurationVar(&cfg.BlockReadHedgeAfter, "query-backend.block-read-hedge-after", 0, "If non-zero, issue a speculative second GetRange request for symbol tables after this duration. 0 disables hedging.")
3234
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-backend.grpc-client-config", f)
3335
}
3436

pkg/querybackend/block_reader.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ type BlockReader struct {
5050
log log.Logger
5151
storage objstore.Bucket
5252

53-
metrics *metrics
54-
hostname string
53+
metrics *metrics
54+
hostname string
55+
hedgeAfter time.Duration
5556

5657
// TODO:
5758
// - Use a worker pool instead of the errgroup.
@@ -61,14 +62,26 @@ type BlockReader struct {
6162
// Instead, they should share the processing pipeline, if possible.
6263
}
6364

64-
func NewBlockReader(logger log.Logger, storage objstore.Bucket, reg prometheus.Registerer) *BlockReader {
65+
func NewBlockReader(logger log.Logger, storage objstore.Bucket, reg prometheus.Registerer, opts ...BlockReaderOption) *BlockReader {
6566
hostname, _ := os.Hostname()
66-
return &BlockReader{
67+
br := &BlockReader{
6768
log: logger,
6869
storage: storage,
6970
metrics: newMetrics(reg),
7071
hostname: hostname,
7172
}
73+
for _, opt := range opts {
74+
opt(br)
75+
}
76+
return br
77+
}
78+
79+
type BlockReaderOption func(*BlockReader)
80+
81+
func WithBlockReaderHedgeAfter(d time.Duration) BlockReaderOption {
82+
return func(br *BlockReader) {
83+
br.hedgeAfter = d
84+
}
7285
}
7386

7487
func (b *BlockReader) Invoke(
@@ -115,7 +128,7 @@ func (b *BlockReader) Invoke(
115128
}
116129
blocksCount++
117130
datasetsCount += int64(len(md.Datasets))
118-
obj := block.NewObject(b.storage, md)
131+
obj := block.NewObject(b.storage, md, block.WithObjectHedgeAfter(b.hedgeAfter))
119132
g.Go(util.RecoverPanic((&blockContext{
120133
ctx: ctx,
121134
log: b.log,

pkg/util/retry/hedged.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ type Hedged[T any] struct {
2626
// - the result received first is returned, regardless of anything.
2727
// - if Call fails before the trigger fires, it won't be retried.
2828
FailFast bool
29+
30+
// Cleanup is called on the result of a losing attempt when it succeeded
31+
// but another attempt already won. Use this to release resources (e.g.,
32+
// close an io.ReadCloser) that would otherwise be abandoned.
33+
Cleanup func(T)
2934
}
3035

3136
type Call[T any] func(ctx context.Context, isRetry bool) (T, error)
@@ -58,9 +63,14 @@ func (s Hedged[T]) Do(ctx context.Context) (T, error) {
5863
// If there is an ongoing attempt, it will be cancelled,
5964
// because we already got the result.
6065
cancel()
66+
stored := false
6167
do.Do(func() {
6268
ret, err = attemptRet, attemptErr
69+
stored = true
6370
})
71+
if !stored && attemptErr == nil && s.Cleanup != nil {
72+
s.Cleanup(attemptRet)
73+
}
6474
}()
6575
}
6676

pkg/util/retry/hedged_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package retry
33
import (
44
"context"
55
"errors"
6+
"sync/atomic"
67
"testing"
78
"testing/synctest"
89
"time"
@@ -135,3 +136,105 @@ func Test_Hedging(t *testing.T) {
135136
})
136137
}
137138
}
139+
140+
func Test_Hedging_Cleanup(t *testing.T) {
141+
t.Run("cleanup called on loser when both succeed", func(t *testing.T) {
142+
synctest.Test(t, func(t *testing.T) {
143+
var cleaned int64
144+
// First call is slow; hedge fires and wins. First call then
145+
// succeeds but loses — Cleanup must be called on its result.
146+
const hedgeDelay = time.Second
147+
a := Hedged[*int]{
148+
Trigger: time.After(hedgeDelay),
149+
FailFast: true,
150+
Cleanup: func(v *int) { atomic.AddInt64(&cleaned, 1) },
151+
Call: func(ctx context.Context, isRetry bool) (*int, error) {
152+
if !isRetry {
153+
// slow: block until cancelled by the winning hedge
154+
<-ctx.Done()
155+
}
156+
v := 1
157+
return &v, nil
158+
},
159+
}
160+
done := make(chan struct{})
161+
go func() {
162+
defer close(done)
163+
_, err := a.Do(context.Background())
164+
if err != nil {
165+
t.Errorf("unexpected error: %v", err)
166+
}
167+
}()
168+
synctest.Wait()
169+
time.Sleep(hedgeDelay) // hedge fires and wins; slow call gets cancelled
170+
synctest.Wait()
171+
<-done
172+
if atomic.LoadInt64(&cleaned) != 1 {
173+
t.Fatal("expected Cleanup to be called exactly once on the loser")
174+
}
175+
})
176+
})
177+
178+
t.Run("cleanup not called when only one attempt runs", func(t *testing.T) {
179+
synctest.Test(t, func(t *testing.T) {
180+
var cleaned int64
181+
a := Hedged[*int]{
182+
Trigger: time.After(time.Hour),
183+
FailFast: true,
184+
Cleanup: func(v *int) { atomic.AddInt64(&cleaned, 1) },
185+
Call: func(ctx context.Context, _ bool) (*int, error) {
186+
v := 1
187+
return &v, nil
188+
},
189+
}
190+
_, err := a.Do(context.Background())
191+
if err != nil {
192+
t.Fatalf("unexpected error: %v", err)
193+
}
194+
if atomic.LoadInt64(&cleaned) != 0 {
195+
t.Fatal("expected Cleanup not to be called")
196+
}
197+
})
198+
})
199+
200+
t.Run("cleanup not called on loser that errored", func(t *testing.T) {
201+
synctest.Test(t, func(t *testing.T) {
202+
var cleaned int64
203+
e := errors.New("fail")
204+
const hedgeDelay = time.Second
205+
// Hedge fires; first call errors, second succeeds.
206+
// Cleanup must NOT be called on the errored loser.
207+
firstBlocked := make(chan struct{})
208+
a := Hedged[*int]{
209+
Trigger: time.After(hedgeDelay),
210+
FailFast: false,
211+
Cleanup: func(v *int) { atomic.AddInt64(&cleaned, 1) },
212+
Call: func(ctx context.Context, isRetry bool) (*int, error) {
213+
if !isRetry {
214+
close(firstBlocked)
215+
<-ctx.Done()
216+
return nil, e
217+
}
218+
v := 1
219+
return &v, nil
220+
},
221+
}
222+
done := make(chan struct{})
223+
go func() {
224+
defer close(done)
225+
_, err := a.Do(context.Background())
226+
if err != nil {
227+
t.Errorf("unexpected error: %v", err)
228+
}
229+
}()
230+
<-firstBlocked
231+
synctest.Wait()
232+
time.Sleep(hedgeDelay)
233+
synctest.Wait()
234+
<-done
235+
if atomic.LoadInt64(&cleaned) != 0 {
236+
t.Fatal("expected Cleanup not to be called on errored attempt")
237+
}
238+
})
239+
})
240+
}

0 commit comments

Comments
 (0)