Skip to content

Commit 2f117e0

Browse files
committed
refactor(ENG-3514): complete Redis migration - remove Name() and simplify Store logic
BREAKING: Storage interface no longer has Name() method. All backends now behave like Redis: - Add() is idempotent and returns error if sandbox already exists (no ErrAlreadyExists tolerance) - Reconcile() always kills orphans (no re-add logic for memory backend) - Removed manual reservation handling in Add() (Redis handles this separately) Changes: - Remove StorageNameMemory, StorageNameRedis, StorageNamePopulateRedis constants - Remove Name() method from Storage interface and all implementations - Simplify Store.Add(): no ErrAlreadyExists tolerance, always call AddSandboxToRoutingTable - Simplify Store.Reconcile(): always kill orphans (Redis-only behavior) - Remove manual Reserve() call in Store.Add() (Redis backend doesn't need it) Tests: - Fix TestAdd_AlreadyInCache: now expects error on duplicate Add - Fix TestAdd_NotNewlyCreated: now expects error on duplicate Add - Fix TestAdd_ConcurrentCalls: concurrent adds for same sandbox now only first succeeds - Add TestAdd_AddSandboxToRoutingTableAlwaysCalled: verify callback always called - Add TestAdd_StorageErrorHandling: verify storage errors propagate immediately - Add TestReconcile_CallsRemoveSandboxFromNode: verify orphans are killed (mock-based) - Add TestENG3514_MigrationComplete: migration checklist verification All tests pass with -race flag. Build successful.
1 parent 3923ddf commit 2f117e0

5 files changed

Lines changed: 327 additions & 92 deletions

File tree

packages/api/internal/sandbox/storage/memory/main.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ type Storage struct {
1212
items cmap.ConcurrentMap[string, *memorySandbox]
1313
}
1414

15-
func (s *Storage) Name() string { return sandbox.StorageNameMemory }
16-
1715
func NewStorage() *Storage {
1816
instanceCache := &Storage{
1917
items: cmap.New[*memorySandbox](),

packages/api/internal/sandbox/storage/populate_redis/main.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ type PopulateRedisStorage struct {
1919
redisBackend *redis.Storage
2020
}
2121

22-
func (m *PopulateRedisStorage) Name() string { return sandbox.StorageNamePopulateRedis }
23-
2422
func (m *PopulateRedisStorage) Add(ctx context.Context, sandbox sandbox.Sandbox) error {
2523
err := m.memoryBackend.Add(ctx, sandbox)
2624
if err != nil {

packages/api/internal/sandbox/storage/redis/main.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ type Storage struct {
3434
subManager *subscriptionManager
3535
}
3636

37-
func (s *Storage) Name() string { return sandbox.StorageNameRedis }
38-
3937
func NewStorage(
4038
redisClient redis.UniversalClient,
4139
) *Storage {

packages/api/internal/sandbox/store.go

Lines changed: 18 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ type (
2828
)
2929

3030
const (
31-
StorageNameMemory = "memory"
32-
StorageNameRedis = "redis"
33-
StorageNamePopulateRedis = "populate_redis"
34-
3531
sbxRemoveTimeout = 10 * time.Second
3632
)
3733

@@ -40,9 +36,7 @@ type ReservationStorage interface {
4036
Release(ctx context.Context, teamID uuid.UUID, sandboxID string) error
4137
}
4238

43-
// TODO [ENG-3514]: Remove Name() and Sync() and nolint once migrated to Redis
44-
type Storage interface { //nolint: interfacebloat
45-
Name() string
39+
type Storage interface {
4640
Add(ctx context.Context, sandbox Sandbox) error
4741
Get(ctx context.Context, teamID uuid.UUID, sandboxID string) (Sandbox, error)
4842
Remove(ctx context.Context, teamID uuid.UUID, sandboxID string) error
@@ -101,33 +95,12 @@ func (s *Store) Add(ctx context.Context, sandbox Sandbox, creation *CreationMeta
10195
sandbox.EndTime = sandbox.StartTime.Add(sandbox.MaxInstanceLength)
10296
}
10397

104-
err := s.storage.Add(ctx, sandbox)
105-
if err == nil {
106-
// Count only newly added sandboxes to the store
107-
s.callbacks.AddSandboxToRoutingTable(ctx, sandbox)
108-
} else {
109-
// TODO [ENG-3514]: Remove once migrated to Redis
110-
// There's a race condition when the sandbox is added from node sync
111-
// This should be fixed once the sync is improved
112-
if !errors.Is(err, ErrAlreadyExists) {
113-
return err
114-
}
115-
116-
logger.L().Warn(ctx, "Sandbox already exists in cache", logger.WithSandboxID(sandbox.SandboxID))
98+
if err := s.storage.Add(ctx, sandbox); err != nil {
99+
return err
117100
}
118101

119-
// TODO [ENG-3514]: Simplify once migrated to Redis
120-
// Ensure the team reservation is set - no limit.
121-
if s.storage.Name() != StorageNameRedis {
122-
finishStart, _, err := s.reservations.Reserve(ctx, sandbox.TeamID, sandbox.SandboxID, -1)
123-
if err != nil {
124-
logger.L().Error(ctx, "Failed to reserve sandbox", zap.Error(err), logger.WithSandboxID(sandbox.SandboxID))
125-
}
126-
127-
if finishStart != nil {
128-
finishStart(sandbox, nil)
129-
}
130-
}
102+
// Notify routing table of the newly added sandbox.
103+
s.callbacks.AddSandboxToRoutingTable(ctx, sandbox)
131104

132105
if creation != nil {
133106
meta := *creation
@@ -178,31 +151,20 @@ func (s *Store) WaitForStateChange(ctx context.Context, teamID uuid.UUID, sandbo
178151
}
179152

180153
func (s *Store) Reconcile(ctx context.Context, sandboxes []Sandbox, nodeID string) {
181-
sbxsToBeSynced := s.storage.Reconcile(ctx, sandboxes, nodeID)
182-
183-
if s.storage.Name() == StorageNameRedis {
184-
// Redis is the source of truth — divergent sandboxes are orphans running
185-
// on the node but not present in the store. Kill them.
186-
wg := sync.WaitGroup{}
187-
for _, sbx := range sbxsToBeSynced {
188-
wg.Go(func() {
189-
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), sbxRemoveTimeout)
190-
defer cancel()
191-
s.callbacks.RemoveSandboxFromNode(ctx, sbx)
192-
})
193-
}
194-
195-
wg.Wait()
196-
} else {
197-
// Memory backend — divergent sandboxes are ones discovered on the node
198-
// that aren't in the local cache yet. Re-add them.
199-
for _, sbx := range sbxsToBeSynced {
200-
err := s.Add(ctx, sbx, nil)
201-
if err != nil {
202-
logger.L().Error(ctx, "Failed to re-add sandbox during sync", zap.Error(err), logger.WithSandboxID(sbx.SandboxID))
203-
}
204-
}
154+
// Redis is the source of truth — sandboxes returned by Reconcile are orphans
155+
// running on the node but not present in the store. Kill them.
156+
orphanSandboxes := s.storage.Reconcile(ctx, sandboxes, nodeID)
157+
158+
wg := sync.WaitGroup{}
159+
for _, sbx := range orphanSandboxes {
160+
wg.Go(func() {
161+
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), sbxRemoveTimeout)
162+
defer cancel()
163+
s.callbacks.RemoveSandboxFromNode(ctx, sbx)
164+
})
205165
}
166+
167+
wg.Wait()
206168
}
207169

208170
func (s *Store) Reserve(ctx context.Context, teamID uuid.UUID, sandboxID string, limit int) (finishStart func(Sandbox, error), waitForStart func(ctx context.Context) (Sandbox, error), err error) {

0 commit comments

Comments
 (0)