Skip to content

Commit 9cd1422

Browse files
committed
feat(store): implement secondary index for O(1) sandbox filtering by kind
Signed-off-by: Abhinav Singh <abhinavsingh717073@gmail.com>
1 parent b492d4d commit 9cd1422

4 files changed

Lines changed: 70 additions & 80 deletions

File tree

pkg/store/store_redis.go

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type redisStore struct {
3535
sessionPrefix string
3636
expiryIndexKey string
3737
lastActivityIndexKey string
38+
kindIndexKeyPrefix string
3839
}
3940

4041
// initRedisStore init redis store client
@@ -49,6 +50,7 @@ func initRedisStore() (*redisStore, error) {
4950
sessionPrefix: "session:",
5051
expiryIndexKey: "session:expiry",
5152
lastActivityIndexKey: "session:last_activity",
53+
kindIndexKeyPrefix: "session:kind:",
5254
}, nil
5355
}
5456

@@ -146,51 +148,30 @@ func (rs *redisStore) GetSandboxBySessionID(ctx context.Context, sessionID strin
146148
// ListSandboxesByKind returns all active sandboxes matching the given kind.
147149
// Uses SCAN to prevent blocking the Redis instance on large datasets.
148150
func (rs *redisStore) ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error) {
149-
allSandboxes := make([]*types.SandboxInfo, 0) // Initialize as empty array, not nil
150-
var cursor uint64
151-
matchPattern := rs.sessionPrefix + "*"
152-
seenKeys := make(map[string]bool)
151+
kindKey := rs.kindIndexKeyPrefix + kind
152+
sessionIDs, err := rs.cli.SMembers(ctx, kindKey).Result()
153+
if err != nil {
154+
return nil, fmt.Errorf("ListSandboxesByKind smembers failed: %w", err)
155+
}
153156

154-
for {
155-
keys, nextCursor, err := rs.cli.Scan(ctx, cursor, matchPattern, 100).Result()
156-
if err != nil {
157-
return nil, fmt.Errorf("ListSandboxesByKind scan failed: %w", err)
158-
}
157+
if len(sessionIDs) == 0 {
158+
return make([]*types.SandboxInfo, 0), nil
159+
}
159160

160-
if len(keys) > 0 {
161-
sessionIDs := make([]string, 0, len(keys))
162-
for _, key := range keys {
163-
if key == rs.expiryIndexKey || key == rs.lastActivityIndexKey {
164-
continue
165-
}
166-
if seenKeys[key] {
167-
continue
168-
}
169-
seenKeys[key] = true
170-
sessionIDs = append(sessionIDs, strings.TrimPrefix(key, rs.sessionPrefix))
171-
}
172-
173-
if len(sessionIDs) > 0 {
174-
// Batch load the fetched keys
175-
sandboxes, err := rs.loadSandboxesBySessionIDs(ctx, sessionIDs)
176-
if err != nil {
177-
return nil, err
178-
}
179-
180-
// Filter by requested kind
181-
for _, sb := range sandboxes {
182-
if sb != nil && sb.Kind == kind {
183-
allSandboxes = append(allSandboxes, sb)
184-
}
185-
}
186-
}
187-
}
161+
// Batch load the fetched keys
162+
sandboxes, err := rs.loadSandboxesBySessionIDs(ctx, sessionIDs)
163+
if err != nil {
164+
return nil, err
165+
}
188166

189-
cursor = nextCursor
190-
if cursor == 0 {
191-
break // Scan complete
167+
allSandboxes := make([]*types.SandboxInfo, 0, len(sandboxes))
168+
// Filter by requested kind (defensive check) and ensure not nil
169+
for _, sb := range sandboxes {
170+
if sb != nil && sb.Kind == kind {
171+
allSandboxes = append(allSandboxes, sb)
192172
}
193173
}
174+
194175
return allSandboxes, nil
195176
}
196177

@@ -220,6 +201,7 @@ func (rs *redisStore) StoreSandbox(ctx context.Context, sandboxRedis *types.Sand
220201
Score: float64(time.Now().Unix()),
221202
Member: sandboxRedis.SessionID,
222203
})
204+
pipe.SAdd(ctx, rs.kindIndexKeyPrefix+sandboxRedis.Kind, sandboxRedis.SessionID)
223205

224206
cmder, err := pipe.Exec(ctx)
225207
if err != nil {
@@ -267,10 +249,22 @@ func (rs *redisStore) UpdateSandbox(ctx context.Context, sandboxRedis *types.San
267249
func (rs *redisStore) DeleteSandboxBySessionID(ctx context.Context, sessionID string) error {
268250
sessionKey := rs.sessionKey(sessionID)
269251

252+
// Fetch sandbox to get its kind for secondary index cleanup
253+
var kind string
254+
if b, err := rs.cli.Get(ctx, sessionKey).Bytes(); err == nil {
255+
var sb types.SandboxInfo
256+
if json.Unmarshal(b, &sb) == nil {
257+
kind = sb.Kind
258+
}
259+
}
260+
270261
pipe := rs.cli.Pipeline()
271262
pipe.Del(ctx, sessionKey)
272263
pipe.ZRem(ctx, rs.expiryIndexKey, sessionID)
273264
pipe.ZRem(ctx, rs.lastActivityIndexKey, sessionID)
265+
if kind != "" {
266+
pipe.SRem(ctx, rs.kindIndexKeyPrefix+kind, sessionID)
267+
}
274268

275269
if _, err := pipe.Exec(ctx); err != nil {
276270
return fmt.Errorf("DeleteSandboxBySessionID: pipeline EXEC: %w", err)

pkg/store/store_redis_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func newTestRedisClient(t *testing.T) (*redisStore, *miniredis.Miniredis) {
6969
sessionPrefix: "session:",
7070
expiryIndexKey: "session:expiry",
7171
lastActivityIndexKey: "session:last_activity",
72+
kindIndexKeyPrefix: "session:kind:",
7273
}
7374
return rs, mr
7475
}

pkg/store/store_valkey.go

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type valkeyStore struct {
3838
sessionPrefix string
3939
expiryIndexKey string
4040
lastActivityIndexKey string
41+
kindIndexKeyPrefix string
4142
}
4243

4344
// initValkeyStore init valkey store client
@@ -56,6 +57,7 @@ func initValkeyStore() (*valkeyStore, error) {
5657
sessionPrefix: "session:",
5758
expiryIndexKey: "session:expiry",
5859
lastActivityIndexKey: "session:last_activity",
60+
kindIndexKeyPrefix: "session:kind:",
5961
}, nil
6062
}
6163

@@ -178,51 +180,30 @@ func (vs *valkeyStore) GetSandboxBySessionID(ctx context.Context, sessionID stri
178180
// ListSandboxesByKind returns all active sandboxes matching the given kind.
179181
// Uses SCAN to prevent blocking the Valkey instance on large datasets.
180182
func (vs *valkeyStore) ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error) {
181-
allSandboxes := make([]*types.SandboxInfo, 0) // Initialize as empty array, not nil
182-
cursor := uint64(0)
183-
matchPattern := vs.sessionPrefix + "*"
184-
seenKeys := make(map[string]bool)
183+
kindKey := vs.kindIndexKeyPrefix + kind
184+
sessionIDs, err := vs.cli.Do(ctx, vs.cli.B().Smembers().Key(kindKey).Build()).AsStrSlice()
185+
if err != nil && !valkey.IsValkeyNil(err) {
186+
return nil, fmt.Errorf("ListSandboxesByKind smembers failed: %w", err)
187+
}
185188

186-
for {
187-
scanRes, err := vs.cli.Do(ctx, vs.cli.B().Scan().Cursor(cursor).Match(matchPattern).Count(100).Build()).AsScanEntry()
188-
if err != nil {
189-
return nil, fmt.Errorf("ListSandboxesByKind scan failed: %w", err)
190-
}
189+
if len(sessionIDs) == 0 {
190+
return make([]*types.SandboxInfo, 0), nil
191+
}
191192

192-
if len(scanRes.Elements) > 0 {
193-
sessionIDs := make([]string, 0, len(scanRes.Elements))
194-
for _, key := range scanRes.Elements {
195-
if key == vs.expiryIndexKey || key == vs.lastActivityIndexKey {
196-
continue
197-
}
198-
if seenKeys[key] {
199-
continue
200-
}
201-
seenKeys[key] = true
202-
sessionIDs = append(sessionIDs, strings.TrimPrefix(key, vs.sessionPrefix))
203-
}
204-
205-
if len(sessionIDs) > 0 {
206-
// Batch load the fetched keys
207-
sandboxes, err := vs.loadSandboxesBySessionIDs(ctx, sessionIDs)
208-
if err != nil {
209-
return nil, err
210-
}
211-
212-
// Filter by requested kind
213-
for _, sb := range sandboxes {
214-
if sb != nil && sb.Kind == kind {
215-
allSandboxes = append(allSandboxes, sb)
216-
}
217-
}
218-
}
219-
}
193+
// Batch load the fetched keys
194+
sandboxes, err := vs.loadSandboxesBySessionIDs(ctx, sessionIDs)
195+
if err != nil {
196+
return nil, err
197+
}
220198

221-
cursor = scanRes.Cursor
222-
if cursor == 0 {
223-
break // Scan complete
199+
allSandboxes := make([]*types.SandboxInfo, 0, len(sandboxes))
200+
// Filter by requested kind
201+
for _, sb := range sandboxes {
202+
if sb != nil && sb.Kind == kind {
203+
allSandboxes = append(allSandboxes, sb)
224204
}
225205
}
206+
226207
return allSandboxes, nil
227208
}
228209

@@ -248,6 +229,7 @@ func (vs *valkeyStore) StoreSandbox(ctx context.Context, sandboxStore *types.San
248229
ScoreMember(float64(sandboxStore.ExpiresAt.Unix()), sandboxStore.SessionID).Build())
249230
commands = append(commands, vs.cli.B().Zadd().Key(vs.lastActivityIndexKey).ScoreMember().
250231
ScoreMember(float64(time.Now().Unix()), sandboxStore.SessionID).Build())
232+
commands = append(commands, vs.cli.B().Sadd().Key(vs.kindIndexKeyPrefix+sandboxStore.Kind).Member(sandboxStore.SessionID).Build())
251233

252234
for i, resp := range vs.cli.DoMulti(ctx, commands...) {
253235
if err = resp.Error(); err != nil {
@@ -286,10 +268,22 @@ func (vs *valkeyStore) UpdateSandbox(ctx context.Context, sandboxStore *types.Sa
286268
func (vs *valkeyStore) DeleteSandboxBySessionID(ctx context.Context, sessionID string) error {
287269
sessionKey := vs.sessionKey(sessionID)
288270

271+
// Fetch sandbox to get its kind for secondary index cleanup
272+
var kind string
273+
if b, err := vs.cli.Do(ctx, vs.cli.B().Get().Key(sessionKey).Build()).AsBytes(); err == nil {
274+
var sb types.SandboxInfo
275+
if json.Unmarshal(b, &sb) == nil {
276+
kind = sb.Kind
277+
}
278+
}
279+
289280
commands := make(valkey.Commands, 0, 4)
290281
commands = append(commands, vs.cli.B().Del().Key(sessionKey).Build())
291282
commands = append(commands, vs.cli.B().Zrem().Key(vs.expiryIndexKey).Member(sessionID).Build())
292283
commands = append(commands, vs.cli.B().Zrem().Key(vs.lastActivityIndexKey).Member(sessionID).Build())
284+
if kind != "" {
285+
commands = append(commands, vs.cli.B().Srem().Key(vs.kindIndexKeyPrefix+kind).Member(sessionID).Build())
286+
}
293287

294288
for i, resp := range vs.cli.DoMulti(ctx, commands...) {
295289
if err := resp.Error(); err != nil {

pkg/store/store_valkey_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func newValkeyTestClient(t *testing.T) (*valkeyStore, *miniredis.Miniredis) {
137137
sessionPrefix: "session:",
138138
expiryIndexKey: "session:expiry",
139139
lastActivityIndexKey: "session:last_activity",
140+
kindIndexKeyPrefix: "session:kind:",
140141
}
141142
return rs, mr
142143
}

0 commit comments

Comments
 (0)