Skip to content

Commit 7ce5801

Browse files
committed
fix: cache invalidation on delete and not-found errors for deletes
- Invalidate ristretto cache entries when schemas/versions are deleted, preventing stale data from being served after recreation (#180) - Return not-found error when deleting non-existent namespaces, schemas, or versions by checking RowsAffected (#145) - Add Del method to Cache interface for cache invalidation Fixes #180 Fixes #145
1 parent e1fb38d commit 7ce5801

5 files changed

Lines changed: 47 additions & 10 deletions

File tree

core/schema/mocks/schema_cache.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/schema/schema.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type Provider interface {
5555
type Cache interface {
5656
Get(interface{}) (interface{}, bool)
5757
Set(interface{}, interface{}, int64) bool
58+
Del(interface{})
5859
}
5960

6061
// Schema represents a schema entity with its configuration.

core/schema/service.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,25 @@ func (s *Service) Get(ctx context.Context, namespace string, schemaName string,
122122
return s.withMetadata(ctx, namespace, schemaName, func() ([]byte, error) { return s.cachedGetSchema(ctx, namespace, schemaName, version) })
123123
}
124124

125-
// Delete removes a schema and all its versions.
125+
// Delete removes a schema and all its versions, and invalidates cached entries.
126126
func (s *Service) Delete(ctx context.Context, namespace string, schemaName string) error {
127-
return s.repo.Delete(ctx, namespace, schemaName)
127+
versions, _ := s.repo.ListVersions(ctx, namespace, schemaName)
128+
err := s.repo.Delete(ctx, namespace, schemaName)
129+
if err == nil {
130+
for _, v := range versions {
131+
s.cache.Del(schemaKeyFunc(namespace, schemaName, v))
132+
}
133+
}
134+
return err
128135
}
129136

130-
// DeleteVersion removes a specific version of a schema.
137+
// DeleteVersion removes a specific version of a schema and invalidates the cached entry.
131138
func (s *Service) DeleteVersion(ctx context.Context, namespace string, schemaName string, version int32) error {
132-
return s.repo.DeleteVersion(ctx, namespace, schemaName, version)
139+
err := s.repo.DeleteVersion(ctx, namespace, schemaName, version)
140+
if err == nil {
141+
s.cache.Del(schemaKeyFunc(namespace, schemaName, version))
142+
}
143+
return err
133144
}
134145

135146
// GetLatest retrieves the latest version of a schema.

internal/store/postgres/namespace_repository.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/georgysavva/scany/pgxscan"
77
"github.com/raystack/stencil/core/namespace"
8+
"github.com/raystack/stencil/internal/store"
89
)
910

1011
const namespaceListQuery = `
@@ -60,11 +61,17 @@ func (r *NamespaceRepository) Get(ctx context.Context, id string) (namespace.Nam
6061
}
6162

6263
func (r *NamespaceRepository) Delete(ctx context.Context, id string) error {
63-
_, err := r.db.Exec(ctx, namespaceDeleteQuery, id)
64+
ct, err := r.db.Exec(ctx, namespaceDeleteQuery, id)
65+
if err != nil {
66+
return wrapError(err, "%s", id)
67+
}
68+
if ct.RowsAffected() == 0 {
69+
return store.NoRowsErr.WithErr(nil, id)
70+
}
6471
if _, execErr := r.db.Exec(ctx, deleteOrphanedData); execErr != nil {
6572
return wrapError(execErr, "%s", id)
6673
}
67-
return wrapError(err, "%s", id)
74+
return nil
6875
}
6976

7077
func (r *NamespaceRepository) List(ctx context.Context) ([]namespace.Namespace, error) {

internal/store/postgres/schema_repository.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/jackc/pgx/v4"
88
"github.com/pkg/errors"
99
"github.com/raystack/stencil/core/schema"
10+
"github.com/raystack/stencil/internal/store"
1011
)
1112

1213
type SchemaRepository struct {
@@ -84,12 +85,18 @@ func (r *SchemaRepository) List(ctx context.Context, namespaceID string) ([]sche
8485
}
8586

8687
func (r *SchemaRepository) Delete(ctx context.Context, ns string, sc string) error {
87-
_, err := r.db.Exec(ctx, deleteSchemaQuery, ns, sc)
88+
ct, err := r.db.Exec(ctx, deleteSchemaQuery, ns, sc)
89+
if err != nil {
90+
return wrapError(err, "delete schema")
91+
}
92+
if ct.RowsAffected() == 0 {
93+
return store.NoRowsErr.WithErr(nil, "schema")
94+
}
8895
// Idempotent operation to clean orphaned data.
8996
if _, execErr := r.db.Exec(ctx, deleteOrphanedData); execErr != nil {
9097
return wrapError(execErr, "delete schema")
9198
}
92-
return wrapError(err, "delete schema")
99+
return nil
93100
}
94101

95102
func (r *SchemaRepository) ListVersions(ctx context.Context, ns string, sc string) ([]int32, error) {
@@ -99,12 +106,18 @@ func (r *SchemaRepository) ListVersions(ctx context.Context, ns string, sc strin
99106
}
100107

101108
func (r *SchemaRepository) DeleteVersion(ctx context.Context, ns string, sc string, version int32) error {
102-
_, err := r.db.Exec(ctx, deleteVersionQuery, ns, sc, version)
109+
ct, err := r.db.Exec(ctx, deleteVersionQuery, ns, sc, version)
110+
if err != nil {
111+
return wrapError(err, "delete version")
112+
}
113+
if ct.RowsAffected() == 0 {
114+
return store.NoRowsErr.WithErr(nil, "version")
115+
}
103116
// Idempotent operation to clean orphaned data.
104117
if _, execErr := r.db.Exec(ctx, deleteOrphanedData); execErr != nil {
105118
return wrapError(execErr, "delete version")
106119
}
107-
return wrapError(err, "delete version")
120+
return nil
108121
}
109122

110123
const schemaInsertQuery = `

0 commit comments

Comments
 (0)