Skip to content

Commit 5979cb2

Browse files
authored
feat: implement handling of dispatch and caching for DispatchQueryPlan (#3079)
# Description Implements caching and cache keys for DispatchQueryPlan. With this PR, the alias iterator results, for Check alone, are cached in the usual dispatch cache for SpiceDB. This also introduces an alias-chain-collapse optimization, which reduces multiple consecutive Alias boundaries into a single boundary (to simplify dispatch)
1 parent 97811cf commit 5979cb2

17 files changed

Lines changed: 1009 additions & 85 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
66
## [Unreleased]
77
### Added
88
- Add DispatchExecutor, a query plan executor that is Dispatch-aware and sends subproblems on Alias boundaries (https://github.com/authzed/spicedb/pull/3074)
9+
- Implement Dispatch caching for query plan execution (https://github.com/authzed/spicedb/pull/3079)
910

1011
### Changed
1112
- Build: strip quarantine attribute for MacOS (https://github.com/authzed/spicedb/pull/3082)

internal/dispatch/caching/caching.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,8 +405,55 @@ func (cd *Dispatcher) DispatchLookupSubjects(req *v1.DispatchLookupSubjectsReque
405405
}
406406

407407
func (cd *Dispatcher) DispatchQueryPlan(req *v1.DispatchQueryPlanRequest, stream dispatch.PlanStream) error {
408-
// TODO: add caching logic
409-
return cd.d.DispatchQueryPlan(req, stream)
408+
switch req.Operation {
409+
case v1.PlanOperation_PLAN_OPERATION_CHECK:
410+
return cd.dispatchQueryPlanCheckCached(req, stream)
411+
default:
412+
// TODO: add caching for LookupResources and LookupSubjects
413+
return cd.d.DispatchQueryPlan(req, stream)
414+
}
415+
}
416+
417+
func (cd *Dispatcher) dispatchQueryPlanCheckCached(req *v1.DispatchQueryPlanRequest, stream dispatch.PlanStream) error {
418+
cd.checkTotalCounter.Inc()
419+
420+
requestKey, err := cd.keyHandler.PlanCheckCacheKey(stream.Context(), req)
421+
if err != nil {
422+
return err
423+
}
424+
425+
if cachedPathRaw, found := cd.c.Get(requestKey); found {
426+
var cachedPath v1.ResultPath
427+
if err := cachedPath.UnmarshalVT(cachedPathRaw.([]byte)); err != nil {
428+
return err
429+
}
430+
cd.checkFromCacheCounter.Inc()
431+
return stream.Publish(&v1.DispatchQueryPlanResponse{
432+
Paths: []*v1.ResultPath{&cachedPath},
433+
})
434+
}
435+
436+
// Cache miss — collect the streamed result to cache the path.
437+
// Check produces at most one response containing a single ResultPath.
438+
collecting := dispatch.NewCollectingDispatchStream[*v1.DispatchQueryPlanResponse](stream.Context())
439+
if err := cd.d.DispatchQueryPlan(req, collecting); err != nil {
440+
return err
441+
}
442+
443+
for _, resp := range collecting.Results() {
444+
if len(resp.Paths) > 0 {
445+
pathBytes, err := resp.Paths[0].MarshalVT()
446+
if err == nil {
447+
cd.c.Set(requestKey, pathBytes, sliceSize(pathBytes))
448+
}
449+
}
450+
451+
if err := stream.Publish(resp); err != nil {
452+
return err
453+
}
454+
}
455+
456+
return nil
410457
}
411458

412459
func (cd *Dispatcher) Close() error {

internal/dispatch/graph/graph.go

Lines changed: 159 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"google.golang.org/grpc/codes"
1414
"google.golang.org/grpc/status"
1515

16+
"github.com/authzed/spicedb/internal/caveats"
1617
"github.com/authzed/spicedb/internal/dispatch"
1718
"github.com/authzed/spicedb/internal/graph"
1819
log "github.com/authzed/spicedb/internal/logging"
@@ -24,6 +25,8 @@ import (
2425
"github.com/authzed/spicedb/pkg/middleware/nodeid"
2526
core "github.com/authzed/spicedb/pkg/proto/core/v1"
2627
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
28+
"github.com/authzed/spicedb/pkg/query"
29+
"github.com/authzed/spicedb/pkg/schema/v2"
2730
"github.com/authzed/spicedb/pkg/tuple"
2831
)
2932

@@ -471,12 +474,166 @@ func (ld *localDispatcher) DispatchLookupSubjects(
471474
)
472475
}
473476

474-
// DispatchQueryPlan implements dispatch.Plan interface
477+
// DispatchQueryPlan implements dispatch.Plan interface.
478+
// It loads the schema, compiles the plan, finds the subtree by canonical key,
479+
// and executes it locally. The Impl method is called directly on the found
480+
// iterator to avoid re-triggering the executor's dispatch decision on the
481+
// same alias boundary that was already dispatched by the caller.
475482
func (ld *localDispatcher) DispatchQueryPlan(
476483
req *v1.DispatchQueryPlanRequest,
477484
stream dispatch.PlanStream,
478485
) error {
479-
return errors.New("DispatchQueryPlan not yet implemented")
486+
ctx := stream.Context()
487+
488+
planCtx := req.PlanContext
489+
if planCtx == nil {
490+
return errors.New("DispatchQueryPlan: missing plan_context")
491+
}
492+
493+
revision, err := ld.parseRevision(ctx, planCtx.Revision)
494+
if err != nil {
495+
return err
496+
}
497+
498+
// Load schema at the requested revision.
499+
// TODO: use cached compiled plans (Phase 7) instead of recompiling each time.
500+
it, err := ld.findIteratorByCanonicalKey(ctx, revision, query.CanonicalKey(req.CanonicalKey))
501+
if err != nil {
502+
return err
503+
}
504+
505+
// Build execution context with DispatchExecutor so nested alias boundaries
506+
// in the subtree can re-dispatch through the full dispatch chain.
507+
dl := datalayer.MustFromContext(ctx)
508+
executor := dispatch.NewDispatchExecutor(ld, planCtx)
509+
qctx := &query.Context{
510+
Context: ctx,
511+
Executor: executor,
512+
Reader: query.NewQueryDatastoreReader(dl.SnapshotReader(revision)),
513+
CaveatRunner: caveats.NewCaveatRunner(caveattypes.Default.TypeSet),
514+
CaveatContext: dispatch.CaveatContextFromPlanContext(planCtx),
515+
}
516+
517+
resource := query.Object{ObjectType: req.Resource.Namespace, ObjectID: req.Resource.ObjectId}
518+
subject := query.ObjectAndRelation{
519+
ObjectType: req.Subject.Namespace,
520+
ObjectID: req.Subject.ObjectId,
521+
Relation: req.Subject.Relation,
522+
}
523+
524+
// Call Impl directly — the dispatch boundary has already been crossed.
525+
switch req.Operation {
526+
case v1.PlanOperation_PLAN_OPERATION_CHECK:
527+
path, err := it.CheckImpl(qctx, resource, subject)
528+
if err != nil {
529+
return err
530+
}
531+
if path != nil {
532+
return stream.Publish(&v1.DispatchQueryPlanResponse{
533+
Paths: []*v1.ResultPath{dispatch.QueryPathToResultPath(path)},
534+
})
535+
}
536+
return nil
537+
538+
case v1.PlanOperation_PLAN_OPERATION_LOOKUP_RESOURCES:
539+
// TODO: implement caching for LookupResources results
540+
pathSeq, err := it.IterResourcesImpl(qctx, subject, query.NoObjectFilter())
541+
if err != nil {
542+
return err
543+
}
544+
for path, err := range pathSeq {
545+
if err != nil {
546+
return err
547+
}
548+
if err := stream.Publish(&v1.DispatchQueryPlanResponse{
549+
Paths: []*v1.ResultPath{dispatch.QueryPathToResultPath(path)},
550+
}); err != nil {
551+
return err
552+
}
553+
}
554+
return nil
555+
556+
case v1.PlanOperation_PLAN_OPERATION_LOOKUP_SUBJECTS:
557+
// TODO: implement caching for LookupSubjects results
558+
pathSeq, err := it.IterSubjectsImpl(qctx, resource, query.NoObjectFilter())
559+
if err != nil {
560+
return err
561+
}
562+
for path, err := range pathSeq {
563+
if err != nil {
564+
return err
565+
}
566+
if err := stream.Publish(&v1.DispatchQueryPlanResponse{
567+
Paths: []*v1.ResultPath{dispatch.QueryPathToResultPath(path)},
568+
}); err != nil {
569+
return err
570+
}
571+
}
572+
return nil
573+
574+
default:
575+
return fmt.Errorf("DispatchQueryPlan: unknown operation %v", req.Operation)
576+
}
577+
}
578+
579+
// findIteratorByCanonicalKey loads the schema at the given revision, compiles
580+
// all permissions, and returns the iterator subtree matching the canonical key.
581+
func (ld *localDispatcher) findIteratorByCanonicalKey(ctx context.Context, revision datastore.Revision, targetKey query.CanonicalKey) (query.Iterator, error) {
582+
dl := datalayer.MustFromContext(ctx)
583+
reader := dl.SnapshotReader(revision)
584+
585+
sr, err := reader.ReadSchema(ctx)
586+
if err != nil {
587+
return nil, fmt.Errorf("DispatchQueryPlan: failed to read schema: %w", err)
588+
}
589+
590+
nsDefs, err := sr.ListAllTypeDefinitions(ctx)
591+
if err != nil {
592+
return nil, fmt.Errorf("DispatchQueryPlan: failed to list type definitions: %w", err)
593+
}
594+
595+
caveatDefs, err := sr.ListAllCaveatDefinitions(ctx)
596+
if err != nil {
597+
return nil, fmt.Errorf("DispatchQueryPlan: failed to list caveat definitions: %w", err)
598+
}
599+
600+
fullSchema, err := schema.BuildSchemaFromDefinitions(
601+
datastore.DefinitionsOf(nsDefs),
602+
datastore.DefinitionsOf(caveatDefs),
603+
)
604+
if err != nil {
605+
return nil, fmt.Errorf("DispatchQueryPlan: failed to build schema: %w", err)
606+
}
607+
608+
for nsName, def := range fullSchema.Definitions() {
609+
for permName := range def.Permissions() {
610+
co, err := query.BuildOutlineFromSchema(fullSchema, nsName, permName)
611+
if err != nil {
612+
continue
613+
}
614+
it, err := co.Compile()
615+
if err != nil {
616+
continue
617+
}
618+
if found := findByCanonicalKey(it, targetKey); found != nil {
619+
return found, nil
620+
}
621+
}
622+
}
623+
return nil, fmt.Errorf("DispatchQueryPlan: no iterator found for canonical key %q", targetKey)
624+
}
625+
626+
// findByCanonicalKey recursively searches an iterator tree for a node matching the key.
627+
func findByCanonicalKey(it query.Iterator, key query.CanonicalKey) query.Iterator {
628+
if it.CanonicalKey() == key {
629+
return it
630+
}
631+
for _, sub := range it.Subiterators() {
632+
if found := findByCanonicalKey(sub, key); found != nil {
633+
return found
634+
}
635+
}
636+
return nil
480637
}
481638

482639
func (ld *localDispatcher) Close() error {

internal/dispatch/keys/computed.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ type cachePrefix string
2020
// Define the various prefixes for the cache entries. These must *all* be unique and must *all*
2121
// also be placed into the cachePrefixes slice below.
2222
const (
23-
checkViaRelationPrefix cachePrefix = "cr"
24-
checkViaCanonicalPrefix cachePrefix = "cc"
25-
lookupPrefix cachePrefix = "l"
26-
expandPrefix cachePrefix = "e"
27-
lookupSubjectsPrefix cachePrefix = "ls"
23+
checkViaRelationPrefix cachePrefix = "cr"
24+
checkViaCanonicalPrefix cachePrefix = "cc"
25+
lookupPrefix cachePrefix = "l"
26+
expandPrefix cachePrefix = "e"
27+
lookupSubjectsPrefix cachePrefix = "ls"
28+
planCheckPrefix cachePrefix = "pc"
29+
planLookupResourcesPrefix cachePrefix = "pr"
30+
planLookupSubjectsPrefix cachePrefix = "ps"
2831
)
2932

3033
var cachePrefixes = []cachePrefix{
@@ -33,6 +36,9 @@ var cachePrefixes = []cachePrefix{
3336
lookupPrefix,
3437
expandPrefix,
3538
lookupSubjectsPrefix,
39+
planCheckPrefix,
40+
planLookupResourcesPrefix,
41+
planLookupSubjectsPrefix,
3642
}
3743

3844
// checkRequestToKey converts a check request into a cache key based on the relation
@@ -113,3 +119,31 @@ func lookupSubjectsRequestToKey(req *v1.DispatchLookupSubjectsRequest, option di
113119
hashableIds(req.ResourceIds),
114120
)
115121
}
122+
123+
// planCheckRequestToKey converts a plan check request into a cache key
124+
func planCheckRequestToKey(req *v1.DispatchQueryPlanRequest, option dispatchCacheKeyHashComputeOption) DispatchCacheKey {
125+
return dispatchCacheKeyHash(planCheckPrefix, req.PlanContext.Revision, option,
126+
hashableString(req.CanonicalKey),
127+
hashableOnr{req.Resource},
128+
hashableOnr{req.Subject},
129+
hashableContext{Struct: req.PlanContext.CaveatContext},
130+
)
131+
}
132+
133+
// planLookupResourcesRequestToKey converts a plan lookup resources request into a cache key
134+
func planLookupResourcesRequestToKey(req *v1.DispatchQueryPlanRequest, option dispatchCacheKeyHashComputeOption) DispatchCacheKey {
135+
return dispatchCacheKeyHash(planLookupResourcesPrefix, req.PlanContext.Revision, option,
136+
hashableString(req.CanonicalKey),
137+
hashableOnr{req.Subject},
138+
hashableContext{Struct: req.PlanContext.CaveatContext},
139+
)
140+
}
141+
142+
// planLookupSubjectsRequestToKey converts a plan lookup subjects request into a cache key
143+
func planLookupSubjectsRequestToKey(req *v1.DispatchQueryPlanRequest, option dispatchCacheKeyHashComputeOption) DispatchCacheKey {
144+
return dispatchCacheKeyHash(planLookupSubjectsPrefix, req.PlanContext.Revision, option,
145+
hashableString(req.CanonicalKey),
146+
hashableOnr{req.Resource},
147+
hashableContext{Struct: req.PlanContext.CaveatContext},
148+
)
149+
}

internal/dispatch/keys/computed_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,68 @@ var generatorFuncs = map[string]generatorFunc{
522522
}
523523
},
524524

525+
// Plan Check.
526+
string(planCheckPrefix): func(
527+
resourceIds []string,
528+
subjectIds []string,
529+
resourceRelation *core.RelationReference,
530+
subjectRelation *core.RelationReference,
531+
metadata *v1.ResolverMeta,
532+
) (DispatchCacheKey, []string) {
533+
return planCheckRequestToKey(&v1.DispatchQueryPlanRequest{
534+
CanonicalKey: resourceRelation.Relation,
535+
Resource: ONR(resourceRelation.Namespace, resourceIds[0], resourceRelation.Relation),
536+
Subject: ONR(subjectRelation.Namespace, subjectIds[0], subjectRelation.Relation),
537+
PlanContext: &v1.PlanContext{Revision: metadata.AtRevision},
538+
}, computeBothHashes), []string{
539+
resourceRelation.Relation,
540+
resourceRelation.Namespace,
541+
resourceIds[0],
542+
subjectRelation.Namespace,
543+
subjectIds[0],
544+
subjectRelation.Relation,
545+
}
546+
},
547+
548+
// Plan Lookup Resources.
549+
string(planLookupResourcesPrefix): func(
550+
resourceIds []string,
551+
subjectIds []string,
552+
resourceRelation *core.RelationReference,
553+
subjectRelation *core.RelationReference,
554+
metadata *v1.ResolverMeta,
555+
) (DispatchCacheKey, []string) {
556+
return planLookupResourcesRequestToKey(&v1.DispatchQueryPlanRequest{
557+
CanonicalKey: resourceRelation.Relation,
558+
Subject: ONR(subjectRelation.Namespace, subjectIds[0], subjectRelation.Relation),
559+
PlanContext: &v1.PlanContext{Revision: metadata.AtRevision},
560+
}, computeBothHashes), []string{
561+
resourceRelation.Relation,
562+
subjectRelation.Namespace,
563+
subjectIds[0],
564+
subjectRelation.Relation,
565+
}
566+
},
567+
568+
// Plan Lookup Subjects.
569+
string(planLookupSubjectsPrefix): func(
570+
resourceIds []string,
571+
subjectIds []string,
572+
resourceRelation *core.RelationReference,
573+
subjectRelation *core.RelationReference,
574+
metadata *v1.ResolverMeta,
575+
) (DispatchCacheKey, []string) {
576+
return planLookupSubjectsRequestToKey(&v1.DispatchQueryPlanRequest{
577+
CanonicalKey: resourceRelation.Relation,
578+
Resource: ONR(resourceRelation.Namespace, resourceIds[0], resourceRelation.Relation),
579+
PlanContext: &v1.PlanContext{Revision: metadata.AtRevision},
580+
}, computeBothHashes), []string{
581+
resourceRelation.Relation,
582+
resourceRelation.Namespace,
583+
resourceIds[0],
584+
}
585+
},
586+
525587
// Lookup Subjects.
526588
string(lookupSubjectsPrefix): func(
527589
resourceIds []string,

0 commit comments

Comments
 (0)