Skip to content

Commit a6a72f0

Browse files
committed
fix: avoid deadlocking singleflighted reqs
1 parent 97811cf commit a6a72f0

3 files changed

Lines changed: 31 additions & 0 deletions

File tree

internal/datastore/proxy/singleflight.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package proxy
22

33
import (
44
"context"
5+
"time"
56

67
"resenje.org/singleflight"
78

@@ -43,18 +44,30 @@ func (p *singleflightProxy) ReadWriteTx(ctx context.Context, f datastore.TxUserF
4344
func (p *singleflightProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
4445
// NOTE: Optimized revisions are singleflighted by the underlying datastore via the
4546
// CachedOptimizedRevisions struct.
47+
ctx, span := tracer.Start(ctx, "singleflightProxy.OptimizedRevision")
48+
defer span.End()
4649
return p.delegate.OptimizedRevision(ctx)
4750
}
4851

4952
func (p *singleflightProxy) CheckRevision(ctx context.Context, revision datastore.Revision) error {
53+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
54+
defer cancel()
55+
5056
_, _, err := p.checkRevGroup.Do(ctx, revision.String(), func(ctx context.Context) (string, error) {
57+
ctx, span := tracer.Start(ctx, "singleflightProxy.CheckRevision(sf)")
58+
defer span.End()
5159
return "", p.delegate.CheckRevision(ctx, revision)
5260
})
5361
return err
5462
}
5563

5664
func (p *singleflightProxy) HeadRevision(ctx context.Context) (datastore.Revision, error) {
65+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
66+
defer cancel()
67+
5768
rev, _, err := p.headRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) {
69+
ctx, span := tracer.Start(ctx, "singleflightProxy.HeadRevision(sf)")
70+
defer span.End()
5871
return p.delegate.HeadRevision(ctx)
5972
})
6073
return rev, err
@@ -69,6 +82,9 @@ func (p *singleflightProxy) Watch(ctx context.Context, afterRevision datastore.R
6982
}
7083

7184
func (p *singleflightProxy) Statistics(ctx context.Context) (datastore.Stats, error) {
85+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
86+
defer cancel()
87+
7288
stats, _, err := p.statsGroup.Do(ctx, "", func(ctx context.Context) (datastore.Stats, error) {
7389
return p.delegate.Statistics(ctx)
7490
})

internal/datastore/revisions/optimized.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat
6363
}
6464
cor.RUnlock()
6565

66+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
67+
defer cancel()
68+
6669
newQuantizedRevision, _, err := cor.updateGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) {
70+
ctx, span := tracer.Start(ctx, "CachedOptimizedRevisions.OptimizedRevision(sf)")
71+
defer span.End()
6772
log.Ctx(ctx).Debug().Time("now", localNow).Msg("computing new revision")
6873

6974
optimized, validFor, err := cor.optimizedFunc(ctx)

pkg/datastore/context.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@ package datastore
33
import (
44
"context"
55

6+
"go.opentelemetry.io/otel"
7+
68
"github.com/authzed/spicedb/pkg/datastore/options"
79
core "github.com/authzed/spicedb/pkg/proto/core/v1"
810
)
911

12+
var tracer = otel.Tracer("spicedb/pkg/datastore/context")
13+
1014
// NewSeparatingContextDatastoreProxy severs any timeouts in the context being
1115
// passed to the datastore and only retains tracing metadata.
1216
//
@@ -49,14 +53,20 @@ func (p *ctxProxy) IsStrictReadModeEnabled() bool {
4953
}
5054

5155
func (p *ctxProxy) OptimizedRevision(ctx context.Context) (Revision, error) {
56+
ctx, span := tracer.Start(ctx, "ctxProxy.OptimizedRevision")
57+
defer span.End()
5258
return p.delegate.OptimizedRevision(context.WithoutCancel(ctx))
5359
}
5460

5561
func (p *ctxProxy) CheckRevision(ctx context.Context, revision Revision) error {
62+
ctx, span := tracer.Start(ctx, "ctxProxy.CheckRevision")
63+
defer span.End()
5664
return p.delegate.CheckRevision(context.WithoutCancel(ctx), revision)
5765
}
5866

5967
func (p *ctxProxy) HeadRevision(ctx context.Context) (Revision, error) {
68+
ctx, span := tracer.Start(ctx, "ctxProxy.HeadRevision")
69+
defer span.End()
6070
return p.delegate.HeadRevision(context.WithoutCancel(ctx))
6171
}
6272

0 commit comments

Comments
 (0)