diff --git a/codecov.yml b/codecov.yml index 4c5d5a131a..48bb17518c 100644 --- a/codecov.yml +++ b/codecov.yml @@ -7,3 +7,4 @@ ignore: - "rpc" - "ui" - "**/mock_*.go" + - "**/*_mock.go" diff --git a/internal/cmd/grpc.go b/internal/cmd/grpc.go index 1e6e47f9bc..3e02faaae4 100644 --- a/internal/cmd/grpc.go +++ b/internal/cmd/grpc.go @@ -250,6 +250,10 @@ func NewGRPCServer( logger.Debug("cache enabled", zap.Stringer("backend", cacher)) } + if s, ok := store.(storage.PinnableSnapshot); ok { + interceptors = append(interceptors, middlewaregrpc.FliptPinSnapshot(s.ContextWithSnapshot)) + } + if cfg.Storage.IsReadOnly() { store = unmodifiable.NewStore(store) } diff --git a/internal/server/evaluation/ofrep_bridge_test.go b/internal/server/evaluation/ofrep_bridge_test.go index f0df8a0413..511fdd6927 100644 --- a/internal/server/evaluation/ofrep_bridge_test.go +++ b/internal/server/evaluation/ofrep_bridge_test.go @@ -1,7 +1,6 @@ package evaluation import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -55,7 +54,7 @@ func TestOFREPFlagEvaluation_Variant(t *testing.T) { }, }, nil) - output, err := s.OFREPFlagEvaluation(context.TODO(), ofrep.EvaluationBridgeInput{ + output, err := s.OFREPFlagEvaluation(t.Context(), ofrep.EvaluationBridgeInput{ FlagKey: flagKey, NamespaceKey: namespaceKey, Context: map[string]string{ @@ -90,7 +89,7 @@ func TestOFREPFlagEvaluation_Boolean(t *testing.T) { store.On("GetEvaluationRollouts", mock.Anything, storage.NewResource(namespaceKey, flagKey)).Return([]*storage.EvaluationRollout{}, nil) - output, err := s.OFREPFlagEvaluation(context.TODO(), ofrep.EvaluationBridgeInput{ + output, err := s.OFREPFlagEvaluation(t.Context(), ofrep.EvaluationBridgeInput{ FlagKey: flagKey, NamespaceKey: namespaceKey, Context: map[string]string{ diff --git a/internal/server/evaluator.go b/internal/server/evaluator.go index a3bb6cb116..48a0bcde88 100644 --- a/internal/server/evaluator.go +++ b/internal/server/evaluator.go @@ -20,7 +20,7 @@ func (s *Server) Evaluate(ctx context.Context, r *flipt.EvaluationRequest) (*fli flag, err := s.store.GetFlag(ctx, storage.NewResource(r.NamespaceKey, r.FlagKey)) if err != nil { - var resp = &flipt.EvaluationResponse{} + resp := &flipt.EvaluationResponse{} resp.Reason = flipt.EvaluationReason_ERROR_EVALUATION_REASON var errnf errs.ErrNotFound diff --git a/internal/server/middleware/grpc/middleware.go b/internal/server/middleware/grpc/middleware.go index 7f1de6c642..09ff804b90 100644 --- a/internal/server/middleware/grpc/middleware.go +++ b/internal/server/middleware/grpc/middleware.go @@ -411,3 +411,10 @@ func forwardHeader(ctx context.Context, req *http.Request, headerKey string) met } return md } + +func FliptPinSnapshot(fn func(context.Context) context.Context) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + ctx = fn(ctx) + return handler(ctx, req) + } +} diff --git a/internal/storage/fs/git/store.go b/internal/storage/fs/git/store.go index aa415380b4..196ed711a7 100644 --- a/internal/storage/fs/git/store.go +++ b/internal/storage/fs/git/store.go @@ -474,3 +474,7 @@ func (s *SnapshotStore) buildSnapshot(ctx context.Context, hash plumbing.Hash) ( return storagefs.SnapshotFromFS(s.logger, gfs, storagefs.WithEtag(hash.String())) } + +func (s *SnapshotStore) ContextWithSnapshot(ctx context.Context) context.Context { + return ctx +} diff --git a/internal/storage/fs/local/store.go b/internal/storage/fs/local/store.go index 59af0b435a..2b293e6264 100644 --- a/internal/storage/fs/local/store.go +++ b/internal/storage/fs/local/store.go @@ -13,6 +13,10 @@ import ( var _ storagefs.SnapshotStore = (*SnapshotStore)(nil) +type contextKey struct{} + +var snapshotCtxKey = contextKey{} + // SnapshotStore implements storagefs.SnapshotStore which // is backed by the local filesystem through os.DirFS type SnapshotStore struct { @@ -58,7 +62,10 @@ func WithPollOptions(opts ...containers.Option[storagefs.Poller]) containers.Opt // View passes the current snapshot to the provided function // while holding a read lock. -func (s *SnapshotStore) View(_ context.Context, fn func(storage.ReadOnlyStore) error) error { +func (s *SnapshotStore) View(ctx context.Context, fn func(storage.ReadOnlyStore) error) error { + if snap, ok := ctx.Value(snapshotCtxKey).(storage.ReadOnlyStore); ok { + return fn(snap) + } s.mu.RLock() defer s.mu.RUnlock() return fn(s.snap) @@ -83,3 +90,9 @@ func (s *SnapshotStore) update(context.Context) (bool, error) { func (s *SnapshotStore) String() string { return "local" } + +func (s *SnapshotStore) ContextWithSnapshot(ctx context.Context) context.Context { + s.mu.RLock() + defer s.mu.RUnlock() + return context.WithValue(ctx, snapshotCtxKey, s.snap) +} diff --git a/internal/storage/fs/object/store.go b/internal/storage/fs/object/store.go index 6be65b9173..b0d4ab6e8a 100644 --- a/internal/storage/fs/object/store.go +++ b/internal/storage/fs/object/store.go @@ -24,6 +24,10 @@ import ( var _ storagefs.SnapshotStore = (*SnapshotStore)(nil) +type contextKey struct{} + +var snapshotCtxKey = contextKey{} + type SnapshotStore struct { *storagefs.Poller @@ -72,7 +76,11 @@ func WithPollOptions(opts ...containers.Option[storagefs.Poller]) containers.Opt // View accepts a function which takes a *StoreSnapshot. // The SnapshotStore will supply a snapshot which is valid // for the lifetime of the provided function call. -func (s *SnapshotStore) View(_ context.Context, fn func(storage.ReadOnlyStore) error) error { +func (s *SnapshotStore) View(ctx context.Context, fn func(storage.ReadOnlyStore) error) error { + if snap, ok := ctx.Value(snapshotCtxKey).(storage.ReadOnlyStore); ok { + return fn(snap) + } + s.mu.RLock() defer s.mu.RUnlock() return fn(s.snap) @@ -201,3 +209,9 @@ func (s *SnapshotStore) getIndex(ctx context.Context) (*storagefs.FliptIndex, er return idx, nil } + +func (s *SnapshotStore) ContextWithSnapshot(ctx context.Context) context.Context { + s.mu.RLock() + defer s.mu.RUnlock() + return context.WithValue(ctx, snapshotCtxKey, s.snap) +} diff --git a/internal/storage/fs/oci/store.go b/internal/storage/fs/oci/store.go index 12f1c56377..ce0c2ded2c 100644 --- a/internal/storage/fs/oci/store.go +++ b/internal/storage/fs/oci/store.go @@ -14,6 +14,10 @@ import ( var _ storagefs.SnapshotStore = (*SnapshotStore)(nil) +type contextKey struct{} + +var snapshotCtxKey = contextKey{} + // SnapshotStore is an implementation storage.SnapshotStore backed by OCI repositories. // It fetches instances of OCI manifests and uses them to build snapshots from their contents. type SnapshotStore struct { @@ -34,7 +38,11 @@ type SnapshotStore struct { // View accepts a function which takes a *StoreSnapshot. // The SnapshotStore will supply a snapshot which is valid // for the lifetime of the provided function call. -func (s *SnapshotStore) View(_ context.Context, fn func(storage.ReadOnlyStore) error) error { +func (s *SnapshotStore) View(ctx context.Context, fn func(storage.ReadOnlyStore) error) error { + if snap, ok := ctx.Value(snapshotCtxKey).(storage.ReadOnlyStore); ok { + return fn(snap) + } + s.mu.RLock() defer s.mu.RUnlock() return fn(s.snap) @@ -101,3 +109,9 @@ func (s *SnapshotStore) update(ctx context.Context) (bool, error) { return true, nil } + +func (s *SnapshotStore) ContextWithSnapshot(ctx context.Context) context.Context { + s.mu.RLock() + defer s.mu.RUnlock() + return context.WithValue(ctx, snapshotCtxKey, s.snap) +} diff --git a/internal/storage/fs/store.go b/internal/storage/fs/store.go index 56666391a5..276a1fd8d6 100644 --- a/internal/storage/fs/store.go +++ b/internal/storage/fs/store.go @@ -31,6 +31,7 @@ type ReferencedSnapshotStore interface { // Empty reference will be regarded as the default reference by the store. View(context.Context, storage.Reference, func(storage.ReadOnlyStore) error) error fmt.Stringer + storage.PinnableSnapshot } // SnapshotStore is a type which has a single function View. @@ -42,6 +43,7 @@ type SnapshotStore interface { // for the lifetime of the provided function call. View(context.Context, func(storage.ReadOnlyStore) error) error fmt.Stringer + storage.PinnableSnapshot } // SingleReferenceSnapshotStore implements ReferencedSnapshotStore but delegates to a SnapshotStore implementation. @@ -76,6 +78,8 @@ type Store struct { viewer ReferencedSnapshotStore } +var _ storage.PinnableSnapshot = (*Store)(nil) + func NewStore(viewer ReferencedSnapshotStore) *Store { return &Store{viewer: viewer} } @@ -322,3 +326,7 @@ func (s *Store) GetVersion(ctx context.Context, ns storage.NamespaceRequest) (ve return err }) } + +func (s *Store) ContextWithSnapshot(ctx context.Context) context.Context { + return s.viewer.ContextWithSnapshot(ctx) +} diff --git a/internal/storage/fs/store_test.go b/internal/storage/fs/store_test.go index 59485326fe..1ff2a24ce9 100644 --- a/internal/storage/fs/store_test.go +++ b/internal/storage/fs/store_test.go @@ -247,6 +247,10 @@ func (s snapshotStoreMock) View(_ context.Context, _ storage.Reference, fn func( return fn(s.MockStore) } +func (s snapshotStoreMock) ContextWithSnapshot(ctx context.Context) context.Context { + return ctx +} + func (s snapshotStoreMock) String() string { return "mock" } diff --git a/internal/storage/sql/common/storage.go b/internal/storage/sql/common/storage.go index f8c1f6ad09..4ee817b9e7 100644 --- a/internal/storage/sql/common/storage.go +++ b/internal/storage/sql/common/storage.go @@ -47,7 +47,6 @@ func (s *Store) GetVersion(ctx context.Context, ns storage.NamespaceRequest) (st RunWith(s.db). QueryRowContext(ctx). Scan(&stateModifiedAt) - if err != nil { return "", err } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 42ea4abb74..baf55b13ea 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -454,3 +454,12 @@ func NewID(id string, opts ...containers.Option[ReferenceRequest]) IDRequest { containers.ApplyAll(&p.ReferenceRequest, opts...) return p } + +type PinnableSnapshot interface { + // ContextWithSnapshot allows to optionally capture a snapshot and return a derived context. + // + // It wraps the provided parent context and attaches internal snapshot if available. + // This allows subsequent storage reads during the same evaluation request to use + // the captured snapshot, ensuring atomic evaluation. + ContextWithSnapshot(context.Context) context.Context +}