Skip to content

Commit 866fb65

Browse files
committed
fix: avoid deadlocking singleflighted reqs
1 parent b371613 commit 866fb65

3 files changed

Lines changed: 34 additions & 4 deletions

File tree

internal/datastore/proxy/singleflight.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,29 @@ func (p *singleflightProxy) ReadWriteTx(ctx context.Context, f datastore.TxUserF
4343
func (p *singleflightProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
4444
// NOTE: Optimized revisions are singleflighted by the underlying datastore via the
4545
// CachedOptimizedRevisions struct.
46+
ctx, span := tracer.Start(ctx, "singleflightProxy.OptimizedRevision")
47+
defer span.End()
4648
return p.delegate.OptimizedRevision(ctx)
4749
}
4850

4951
func (p *singleflightProxy) CheckRevision(ctx context.Context, revision datastore.Revision) error {
5052
_, _, err := p.checkRevGroup.Do(ctx, revision.String(), func(ctx context.Context) (string, error) {
51-
return "", p.delegate.CheckRevision(ctx, revision)
53+
// Sever the context so that a single caller's cancellation does not
54+
// abort the query for all other singleflight waiters.
55+
ctx, span := tracer.Start(ctx, "singleflightProxy.CheckRevision(sf)")
56+
defer span.End()
57+
return "", p.delegate.CheckRevision(context.WithoutCancel(ctx), revision)
5258
})
5359
return err
5460
}
5561

5662
func (p *singleflightProxy) HeadRevision(ctx context.Context) (datastore.Revision, error) {
5763
rev, _, err := p.headRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) {
58-
return p.delegate.HeadRevision(ctx)
64+
// Sever the context so that a single caller's cancellation does not
65+
// abort the query for all other singleflight waiters.
66+
ctx, span := tracer.Start(ctx, "singleflightProxy.HeadRevision(sf)")
67+
defer span.End()
68+
return p.delegate.HeadRevision(context.WithoutCancel(ctx))
5969
})
6070
return rev, err
6171
}

internal/datastore/revisions/optimized.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,14 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat
6464
cor.RUnlock()
6565

6666
newQuantizedRevision, _, err := cor.updateGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) {
67+
ctx, span := tracer.Start(ctx, "CachedOptimizedRevisions.OptimizedRevision(sf)")
68+
defer span.End()
6769
log.Ctx(ctx).Debug().Time("now", localNow).Msg("computing new revision")
6870

69-
optimized, validFor, err := cor.optimizedFunc(ctx)
71+
// Sever the context so that a single caller's cancellation does not
72+
// abort the query for all other singleflight waiters. This is safe
73+
// because optimizedFunc is a cheap read-only query (SELECT now()).
74+
optimized, validFor, err := cor.optimizedFunc(context.WithoutCancel(ctx))
7075
if err != nil {
7176
return datastore.NoRevision, fmt.Errorf("unable to compute optimized revision: %w", err)
7277
}

pkg/datastore/context.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@ package datastore
33
import (
44
"context"
55

6+
"go.opentelemetry.io/otel"
7+
68
"github.com/authzed/spicedb/pkg/datastore/options"
79
core "github.com/authzed/spicedb/pkg/proto/core/v1"
810
)
911

12+
var tracer = otel.Tracer("spicedb/pkg/datastore/context")
13+
1014
// NewSeparatingContextDatastoreProxy severs any timeouts in the context being
1115
// passed to the datastore and only retains tracing metadata.
1216
//
@@ -49,14 +53,25 @@ func (p *ctxProxy) IsStrictReadModeEnabled() bool {
4953
}
5054

5155
func (p *ctxProxy) OptimizedRevision(ctx context.Context) (Revision, error) {
52-
return p.delegate.OptimizedRevision(context.WithoutCancel(ctx))
56+
// NOTE: do NOT strip cancellation here. OptimizedRevision is singleflighted
57+
// internally by CachedOptimizedRevisions, and stripping cancel before the
58+
// singleflight makes it impossible for callers to escape the wait when the
59+
// underlying query is slow. context.WithoutCancel is applied inside the
60+
// singleflight function instead (see optimized.go).
61+
ctx, span := tracer.Start(ctx, "ctxProxy.OptimizedRevision")
62+
defer span.End()
63+
return p.delegate.OptimizedRevision(ctx)
5364
}
5465

5566
func (p *ctxProxy) CheckRevision(ctx context.Context, revision Revision) error {
67+
ctx, span := tracer.Start(ctx, "ctxProxy.CheckRevision")
68+
defer span.End()
5669
return p.delegate.CheckRevision(context.WithoutCancel(ctx), revision)
5770
}
5871

5972
func (p *ctxProxy) HeadRevision(ctx context.Context) (Revision, error) {
73+
ctx, span := tracer.Start(ctx, "ctxProxy.HeadRevision")
74+
defer span.End()
6075
return p.delegate.HeadRevision(context.WithoutCancel(ctx))
6176
}
6277

0 commit comments

Comments
 (0)