Skip to content

Commit 4e7ab3b

Browse files
committed
fix: address bot review feedback on secondary index implementation
Signed-off-by: Abhinav Singh <abhinavsingh717073@gmail.com>
1 parent 441a55d commit 4e7ab3b

2 files changed

Lines changed: 56 additions & 10 deletions

File tree

pkg/store/store_redis.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
redisv9 "github.com/redis/go-redis/v9"
29+
"k8s.io/klog/v2"
2930

3031
"github.com/volcano-sh/agentcube/pkg/common/types"
3132
)
@@ -149,9 +150,19 @@ func (rs *redisStore) GetSandboxBySessionID(ctx context.Context, sessionID strin
149150
// Uses SCAN to prevent blocking the Redis instance on large datasets.
150151
func (rs *redisStore) ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error) {
151152
kindKey := rs.kindIndexKeyPrefix + kind
152-
sessionIDs, err := rs.cli.SMembers(ctx, kindKey).Result()
153-
if err != nil {
154-
return nil, fmt.Errorf("ListSandboxesByKind smembers failed: %w", err)
153+
var sessionIDs []string
154+
seenKeys := make(map[string]bool)
155+
156+
iter := rs.cli.SScan(ctx, kindKey, 0, "", 100).Iterator()
157+
for iter.Next(ctx) {
158+
val := iter.Val()
159+
if !seenKeys[val] {
160+
seenKeys[val] = true
161+
sessionIDs = append(sessionIDs, val)
162+
}
163+
}
164+
if err := iter.Err(); err != nil {
165+
return nil, fmt.Errorf("ListSandboxesByKind sscan failed: %w", err)
155166
}
156167

157168
if len(sessionIDs) == 0 {
@@ -251,9 +262,16 @@ func (rs *redisStore) DeleteSandboxBySessionID(ctx context.Context, sessionID st
251262

252263
// Fetch sandbox to get its kind for secondary index cleanup
253264
var kind string
254-
if b, err := rs.cli.Get(ctx, sessionKey).Bytes(); err == nil {
265+
b, err := rs.cli.Get(ctx, sessionKey).Bytes()
266+
if err != nil {
267+
if !errors.Is(err, redisv9.Nil) {
268+
klog.Warningf("Failed to get sandbox %s for kind cleanup, secondary index may be orphaned: %v", sessionID, err)
269+
}
270+
} else {
255271
var sb types.SandboxInfo
256-
if json.Unmarshal(b, &sb) == nil {
272+
if err := json.Unmarshal(b, &sb); err != nil {
273+
klog.Warningf("Failed to unmarshal sandbox %s for kind cleanup, secondary index may be orphaned: %v", sessionID, err)
274+
} else {
257275
kind = sb.Kind
258276
}
259277
}

pkg/store/store_valkey.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,30 @@ func (vs *valkeyStore) GetSandboxBySessionID(ctx context.Context, sessionID stri
181181
// Uses SCAN to prevent blocking the Valkey instance on large datasets.
182182
func (vs *valkeyStore) ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error) {
183183
kindKey := vs.kindIndexKeyPrefix + kind
184-
sessionIDs, err := vs.cli.Do(ctx, vs.cli.B().Smembers().Key(kindKey).Build()).AsStrSlice()
185-
if err != nil && !valkey.IsValkeyNil(err) {
186-
return nil, fmt.Errorf("ListSandboxesByKind smembers failed: %w", err)
184+
var sessionIDs []string
185+
seenKeys := make(map[string]bool)
186+
cursor := uint64(0)
187+
188+
for {
189+
scanRes, err := vs.cli.Do(ctx, vs.cli.B().Sscan().Key(kindKey).Cursor(cursor).Count(100).Build()).AsScanEntry()
190+
if err != nil {
191+
if valkey.IsValkeyNil(err) {
192+
break
193+
}
194+
return nil, fmt.Errorf("ListSandboxesByKind sscan failed: %w", err)
195+
}
196+
197+
for _, key := range scanRes.Elements {
198+
if !seenKeys[key] {
199+
seenKeys[key] = true
200+
sessionIDs = append(sessionIDs, key)
201+
}
202+
}
203+
204+
cursor = scanRes.Cursor
205+
if cursor == 0 {
206+
break // Scan complete
207+
}
187208
}
188209

189210
if len(sessionIDs) == 0 {
@@ -270,9 +291,16 @@ func (vs *valkeyStore) DeleteSandboxBySessionID(ctx context.Context, sessionID s
270291

271292
// Fetch sandbox to get its kind for secondary index cleanup
272293
var kind string
273-
if b, err := vs.cli.Do(ctx, vs.cli.B().Get().Key(sessionKey).Build()).AsBytes(); err == nil {
294+
b, err := vs.cli.Do(ctx, vs.cli.B().Get().Key(sessionKey).Build()).AsBytes()
295+
if err != nil {
296+
if !valkey.IsValkeyNil(err) {
297+
klog.Warningf("Failed to get sandbox %s for kind cleanup, secondary index may be orphaned: %v", sessionID, err)
298+
}
299+
} else {
274300
var sb types.SandboxInfo
275-
if json.Unmarshal(b, &sb) == nil {
301+
if err := json.Unmarshal(b, &sb); err != nil {
302+
klog.Warningf("Failed to unmarshal sandbox %s for kind cleanup, secondary index may be orphaned: %v", sessionID, err)
303+
} else {
276304
kind = sb.Kind
277305
}
278306
}

0 commit comments

Comments
 (0)