Skip to content

Commit 86fb8ae

Browse files
committed
fix: cache invalidation on delete and not-found errors
- Invalidate ristretto cache entries when schemas/versions are deleted, preventing stale data from being served after recreation - Return not-found error when deleting non-existent namespaces, schemas, or versions by checking RowsAffected - Add Del method to Cache interface Fixes #180 Fixes #145
1 parent fbb471c commit 86fb8ae

5 files changed

Lines changed: 73 additions & 11 deletions

File tree

core/schema/mocks/schema_cache.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/schema/schema.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,29 @@ package schema
22

33
import "context"
44

5+
// Metadata represents schema metadata such as format and compatibility.
56
type Metadata struct {
67
Authority string
78
Format string
89
Compatibility string
910
}
1011

12+
// SchemaInfo represents version and location information for a schema.
1113
type SchemaInfo struct {
1214
ID string `json:"id"`
1315
Version int32 `json:"version"`
1416
Location string `json:"location"`
1517
}
1618

19+
// SchemaFile represents a schema's data along with its extracted types and fields.
1720
type SchemaFile struct {
1821
ID string
1922
Types []string
2023
Fields []string
2124
Data []byte
2225
}
2326

27+
// Repository defines the persistence interface for schemas.
2428
type Repository interface {
2529
Create(ctx context.Context, namespace string, schema string, metadata *Metadata, versionID string, schemaFile *SchemaFile) (version int32, err error)
2630
List(context.Context, string) ([]Schema, error)
@@ -33,6 +37,7 @@ type Repository interface {
3337
DeleteVersion(context.Context, string, string, int32) error
3438
}
3539

40+
// ParsedSchema defines the interface for a parsed schema with compatibility checks.
3641
type ParsedSchema interface {
3742
IsBackwardCompatible(ParsedSchema) error
3843
IsForwardCompatible(ParsedSchema) error
@@ -41,15 +46,19 @@ type ParsedSchema interface {
4146
GetCanonicalValue() *SchemaFile
4247
}
4348

49+
// Provider defines the interface for parsing raw schema data into a ParsedSchema.
4450
type Provider interface {
4551
ParseSchema(format string, data []byte) (ParsedSchema, error)
4652
}
4753

54+
// Cache defines a key-value cache interface for storing schema data.
4855
type Cache interface {
4956
Get(interface{}) (interface{}, bool)
5057
Set(interface{}, interface{}, int64) bool
58+
Del(interface{})
5159
}
5260

61+
// Schema represents a schema entity with its configuration.
5362
type Schema struct {
5463
Name string
5564
Format string

core/schema/service.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/raystack/stencil/internal/store"
1111
)
1212

13+
// NewService creates a new schema service with the given dependencies.
1314
func NewService(repo Repository, provider Provider, nsSvc NamespaceService, cache Cache) *Service {
1415
return &Service{
1516
repo: repo,
@@ -23,6 +24,7 @@ type NamespaceService interface {
2324
Get(ctx context.Context, name string) (namespace.Namespace, error)
2425
}
2526

27+
// Service provides schema management operations.
2628
type Service struct {
2729
provider Provider
2830
repo Repository
@@ -75,6 +77,7 @@ func (s *Service) checkCompatibility(ctx context.Context, nsName, schemaName, fo
7577
return checkerFn(current, []ParsedSchema{prevSchema})
7678
}
7779

80+
// Create validates, parses, and stores a new schema version.
7881
func (s *Service) Create(ctx context.Context, nsName string, schemaName string, metadata *Metadata, data []byte) (SchemaInfo, error) {
7982
var scInfo SchemaInfo
8083
ns, err := s.namespaceService.Get(ctx, nsName)
@@ -114,18 +117,33 @@ func (s *Service) withMetadata(ctx context.Context, namespace, schemaName string
114117
return meta, data, err
115118
}
116119

120+
// Get retrieves a specific schema version by namespace, name, and version number.
117121
func (s *Service) Get(ctx context.Context, namespace string, schemaName string, version int32) (*Metadata, []byte, error) {
118122
return s.withMetadata(ctx, namespace, schemaName, func() ([]byte, error) { return s.cachedGetSchema(ctx, namespace, schemaName, version) })
119123
}
120124

125+
// Delete removes a schema and all its versions, and invalidates cached entries.
121126
func (s *Service) Delete(ctx context.Context, namespace string, schemaName string) error {
122-
return s.repo.Delete(ctx, namespace, schemaName)
127+
versions, _ := s.repo.ListVersions(ctx, namespace, schemaName)
128+
err := s.repo.Delete(ctx, namespace, schemaName)
129+
if err == nil {
130+
for _, v := range versions {
131+
s.cache.Del(schemaKeyFunc(namespace, schemaName, v))
132+
}
133+
}
134+
return err
123135
}
124136

137+
// DeleteVersion removes a specific version of a schema and invalidates the cached entry.
125138
func (s *Service) DeleteVersion(ctx context.Context, namespace string, schemaName string, version int32) error {
126-
return s.repo.DeleteVersion(ctx, namespace, schemaName, version)
139+
err := s.repo.DeleteVersion(ctx, namespace, schemaName, version)
140+
if err == nil {
141+
s.cache.Del(schemaKeyFunc(namespace, schemaName, version))
142+
}
143+
return err
127144
}
128145

146+
// GetLatest retrieves the latest version of a schema.
129147
func (s *Service) GetLatest(ctx context.Context, namespace string, schemaName string) (*Metadata, []byte, error) {
130148
version, err := s.repo.GetLatestVersion(ctx, namespace, schemaName)
131149
if err != nil {
@@ -134,18 +152,22 @@ func (s *Service) GetLatest(ctx context.Context, namespace string, schemaName st
134152
return s.Get(ctx, namespace, schemaName, version)
135153
}
136154

155+
// GetMetadata retrieves the metadata for a schema.
137156
func (s *Service) GetMetadata(ctx context.Context, namespace, schemaName string) (*Metadata, error) {
138157
return s.repo.GetMetadata(ctx, namespace, schemaName)
139158
}
140159

160+
// UpdateMetadata updates the metadata for a schema.
141161
func (s *Service) UpdateMetadata(ctx context.Context, namespace, schemaName string, meta *Metadata) (*Metadata, error) {
142162
return s.repo.UpdateMetadata(ctx, namespace, schemaName, meta)
143163
}
144164

165+
// List returns all schemas in a namespace.
145166
func (s *Service) List(ctx context.Context, namespaceID string) ([]Schema, error) {
146167
return s.repo.List(ctx, namespaceID)
147168
}
148169

170+
// ListVersions returns all version numbers for a schema.
149171
func (s *Service) ListVersions(ctx context.Context, namespaceID string, schemaName string) ([]int32, error) {
150172
return s.repo.ListVersions(ctx, namespaceID, schemaName)
151173
}

internal/store/postgres/namespace_repository.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/georgysavva/scany/pgxscan"
77
"github.com/raystack/stencil/core/namespace"
8+
"github.com/raystack/stencil/internal/store"
89
)
910

1011
const namespaceListQuery = `
@@ -60,9 +61,17 @@ func (r *NamespaceRepository) Get(ctx context.Context, id string) (namespace.Nam
6061
}
6162

6263
func (r *NamespaceRepository) Delete(ctx context.Context, id string) error {
63-
_, err := r.db.Exec(ctx, namespaceDeleteQuery, id)
64-
r.db.Exec(ctx, deleteOrphanedData)
65-
return wrapError(err, "%s", id)
64+
ct, err := r.db.Exec(ctx, namespaceDeleteQuery, id)
65+
if err != nil {
66+
return wrapError(err, "%s", id)
67+
}
68+
if ct.RowsAffected() == 0 {
69+
return store.NoRowsErr.WithErr(nil, id)
70+
}
71+
if _, execErr := r.db.Exec(ctx, deleteOrphanedData); execErr != nil {
72+
return wrapError(execErr, "%s", id)
73+
}
74+
return nil
6675
}
6776

6877
func (r *NamespaceRepository) List(ctx context.Context) ([]namespace.Namespace, error) {

internal/store/postgres/schema_repository.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/jackc/pgx/v4"
88
"github.com/pkg/errors"
99
"github.com/raystack/stencil/core/schema"
10+
"github.com/raystack/stencil/internal/store"
1011
)
1112

1213
type SchemaRepository struct {
@@ -84,10 +85,18 @@ func (r *SchemaRepository) List(ctx context.Context, namespaceID string) ([]sche
8485
}
8586

8687
func (r *SchemaRepository) Delete(ctx context.Context, ns string, sc string) error {
87-
_, err := r.db.Exec(ctx, deleteSchemaQuery, ns, sc)
88+
ct, err := r.db.Exec(ctx, deleteSchemaQuery, ns, sc)
89+
if err != nil {
90+
return wrapError(err, "delete schema")
91+
}
92+
if ct.RowsAffected() == 0 {
93+
return store.NoRowsErr.WithErr(nil, "schema")
94+
}
8895
// Idempotent operation to clean orphaned data.
89-
r.db.Exec(ctx, deleteOrphanedData)
90-
return wrapError(err, "delete schema")
96+
if _, execErr := r.db.Exec(ctx, deleteOrphanedData); execErr != nil {
97+
return wrapError(execErr, "delete schema")
98+
}
99+
return nil
91100
}
92101

93102
func (r *SchemaRepository) ListVersions(ctx context.Context, ns string, sc string) ([]int32, error) {
@@ -97,10 +106,18 @@ func (r *SchemaRepository) ListVersions(ctx context.Context, ns string, sc strin
97106
}
98107

99108
func (r *SchemaRepository) DeleteVersion(ctx context.Context, ns string, sc string, version int32) error {
100-
_, err := r.db.Exec(ctx, deleteVersionQuery, ns, sc, version)
109+
ct, err := r.db.Exec(ctx, deleteVersionQuery, ns, sc, version)
110+
if err != nil {
111+
return wrapError(err, "delete version")
112+
}
113+
if ct.RowsAffected() == 0 {
114+
return store.NoRowsErr.WithErr(nil, "version")
115+
}
101116
// Idempotent operation to clean orphaned data.
102-
r.db.Exec(ctx, deleteOrphanedData)
103-
return wrapError(err, "delete version")
117+
if _, execErr := r.db.Exec(ctx, deleteOrphanedData); execErr != nil {
118+
return wrapError(execErr, "delete version")
119+
}
120+
return nil
104121
}
105122

106123
const schemaInsertQuery = `

0 commit comments

Comments
 (0)