Skip to content

Commit b35599d

Browse files
committed
feat: add session observability GET endpoints for sandboxes
Signed-off-by: Abhinav Singh <abhinavsingh717073@gmail.com>
1 parent ced2f28 commit b35599d

15 files changed

Lines changed: 557 additions & 29 deletions

pkg/router/session_manager_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ func (f *fakeStoreClient) ListExpiredSandboxes(_ context.Context, _ time.Time, _
8686
return nil, nil
8787
}
8888

89+
func (f *fakeStoreClient) ListSandboxesByKind(_ context.Context, _ string) ([]*types.SandboxInfo, error) {
90+
return nil, nil
91+
}
92+
8993
func (f *fakeStoreClient) ListInactiveSandboxes(_ context.Context, _ time.Time, _ int64) ([]*types.SandboxInfo, error) {
9094
return nil, nil
9195
}

pkg/store/interface.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type Store interface {
3434
UpdateSandbox(ctx context.Context, sandboxStore *types.SandboxInfo) error
3535
// DeleteSandboxBySessionID delete sandbox by session ID
3636
DeleteSandboxBySessionID(ctx context.Context, sessionID string) error
37+
// ListSandboxesByKind returns all active sandboxes matching the given kind.
38+
ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error)
3739
// ListExpiredSandboxes returns up to limit sandboxes with ExpiresAt before the given time
3840
ListExpiredSandboxes(ctx context.Context, before time.Time, limit int64) ([]*types.SandboxInfo, error)
3941
// ListInactiveSandboxes returns up to limit sandboxes with last-activity time before the given time

pkg/store/store_redis.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,57 @@ func (rs *redisStore) GetSandboxBySessionID(ctx context.Context, sessionID strin
143143
return &sandboxRedis, nil
144144
}
145145

146+
// ListSandboxesByKind returns all active sandboxes matching the given kind.
147+
// Uses SCAN to prevent blocking the Redis instance on large datasets.
148+
func (rs *redisStore) ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error) {
149+
allSandboxes := make([]*types.SandboxInfo, 0) // Initialize as empty array, not nil
150+
var cursor uint64
151+
matchPattern := rs.sessionPrefix + "*"
152+
seenKeys := make(map[string]bool)
153+
154+
for {
155+
keys, nextCursor, err := rs.cli.Scan(ctx, cursor, matchPattern, 100).Result()
156+
if err != nil {
157+
return nil, fmt.Errorf("ListSandboxesByKind scan failed: %w", err)
158+
}
159+
160+
if len(keys) > 0 {
161+
sessionIDs := make([]string, 0, len(keys))
162+
for _, key := range keys {
163+
if key == rs.expiryIndexKey || key == rs.lastActivityIndexKey {
164+
continue
165+
}
166+
if seenKeys[key] {
167+
continue
168+
}
169+
seenKeys[key] = true
170+
sessionIDs = append(sessionIDs, strings.TrimPrefix(key, rs.sessionPrefix))
171+
}
172+
173+
if len(sessionIDs) > 0 {
174+
// Batch load the fetched keys
175+
sandboxes, err := rs.loadSandboxesBySessionIDs(ctx, sessionIDs)
176+
if err != nil {
177+
return nil, err
178+
}
179+
180+
// Filter by requested kind
181+
for _, sb := range sandboxes {
182+
if sb != nil && sb.Kind == kind {
183+
allSandboxes = append(allSandboxes, sb)
184+
}
185+
}
186+
}
187+
}
188+
189+
cursor = nextCursor
190+
if cursor == 0 {
191+
break // Scan complete
192+
}
193+
}
194+
return allSandboxes, nil
195+
}
196+
146197
func (rs *redisStore) StoreSandbox(ctx context.Context, sandboxRedis *types.SandboxInfo) error {
147198
if sandboxRedis == nil {
148199
return errors.New("StoreSandbox: sandbox is nil")

pkg/store/store_redis_test.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ func newTestRedisClient(t *testing.T) (*redisStore, *miniredis.Miniredis) {
6767
rs := &redisStore{
6868
cli: redisv9.NewClient(&redisv9.Options{Addr: mr.Addr()}),
6969
sessionPrefix: "session:",
70-
expiryIndexKey: "sandbox:expiry",
71-
lastActivityIndexKey: "sandbox:last_activity",
70+
expiryIndexKey: "session:expiry",
71+
lastActivityIndexKey: "session:last_activity",
7272
}
7373
return rs, mr
7474
}
@@ -344,3 +344,45 @@ func TestUpdateSandboxLastActivity(t *testing.T) {
344344
t.Fatalf("unexpected lastActivity score after update: got %v, want %v", score, newLastActivity.Unix())
345345
}
346346
}
347+
348+
func TestRedisStore_ListSandboxesByKind(t *testing.T) {
349+
ctx := context.Background()
350+
c, _ := newTestRedisClient(t)
351+
352+
now := time.Now().UTC()
353+
sb1 := newTestSandbox("sb-1", "sess-1", now.Add(1*time.Hour))
354+
sb1.Kind = types.AgentRuntimeKind
355+
356+
sb2 := newTestSandbox("sb-2", "sess-2", now.Add(1*time.Hour))
357+
sb2.Kind = types.CodeInterpreterKind
358+
359+
sb3 := newTestSandbox("sb-3", "sess-3", now.Add(1*time.Hour))
360+
sb3.Kind = types.AgentRuntimeKind
361+
362+
assert.NoError(t, c.StoreSandbox(ctx, sb1))
363+
assert.NoError(t, c.StoreSandbox(ctx, sb2))
364+
assert.NoError(t, c.StoreSandbox(ctx, sb3))
365+
366+
// List AgentRuntimeKind
367+
list, err := c.ListSandboxesByKind(ctx, types.AgentRuntimeKind)
368+
assert.NoError(t, err)
369+
assert.Len(t, list, 2)
370+
371+
ids := map[string]bool{}
372+
for _, sb := range list {
373+
ids[sb.SessionID] = true
374+
}
375+
assert.True(t, ids["sess-1"])
376+
assert.True(t, ids["sess-3"])
377+
378+
// List CodeInterpreterKind
379+
list2, err := c.ListSandboxesByKind(ctx, types.CodeInterpreterKind)
380+
assert.NoError(t, err)
381+
assert.Len(t, list2, 1)
382+
assert.Equal(t, "sess-2", list2[0].SessionID)
383+
384+
// List unknown kind
385+
list3, err := c.ListSandboxesByKind(ctx, "unknown-kind")
386+
assert.NoError(t, err)
387+
assert.Len(t, list3, 0)
388+
}

pkg/store/store_valkey.go

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,26 @@ func (vs *valkeyStore) loadSandboxesBySessionIDs(ctx context.Context, sessionIDs
111111
sessionIDKeys = append(sessionIDKeys, vs.sessionKey(sessionID))
112112
}
113113
// MGet should in same slot
114-
stingSliceResults, err := vs.cli.Do(ctx, vs.cli.B().Mget().Key(sessionIDKeys...).Build()).AsStrSlice()
114+
valkeyResults, err := vs.cli.Do(ctx, vs.cli.B().Mget().Key(sessionIDKeys...).Build()).ToArray()
115115
if err != nil {
116116
return nil, fmt.Errorf("loadSandboxesBySessionIDs: Valkey MGet sandboxes failed: %w", err)
117117
}
118118

119-
if len(stingSliceResults) > len(sessionIDKeys) {
120-
return nil, fmt.Errorf("unexpected MGet result size: %d, param size: %d", len(stingSliceResults), len(sessionIDKeys))
119+
if len(valkeyResults) != len(sessionIDKeys) {
120+
return nil, fmt.Errorf("unexpected MGet result size: %d, expected: %d", len(valkeyResults), len(sessionIDKeys))
121121
}
122122

123-
sandboxResults := make([]*types.SandboxInfo, 0, len(stingSliceResults))
124-
for i, sandboxObjString := range stingSliceResults {
123+
sandboxResults := make([]*types.SandboxInfo, 0, len(valkeyResults))
124+
for i, msg := range valkeyResults {
125+
if msg.IsNil() {
126+
// key does not exist, ignore
127+
continue
128+
}
129+
sandboxObjString, err := msg.ToString()
130+
if err != nil {
131+
return nil, fmt.Errorf("parse sandbox string failed: %w, index: %v, sessionID: %v", err, i, sessionIDs[i])
132+
}
125133
if len(sandboxObjString) == 0 {
126-
// sandboxObjString is empty while sessionKey not exist, ignore
127134
continue
128135
}
129136
var sandboxRedis types.SandboxInfo
@@ -168,6 +175,57 @@ func (vs *valkeyStore) GetSandboxBySessionID(ctx context.Context, sessionID stri
168175
return &sandboxRedis, nil
169176
}
170177

178+
// ListSandboxesByKind returns all active sandboxes matching the given kind.
179+
// Uses SCAN to prevent blocking the Valkey instance on large datasets.
180+
func (vs *valkeyStore) ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error) {
181+
allSandboxes := make([]*types.SandboxInfo, 0) // Initialize as empty array, not nil
182+
cursor := uint64(0)
183+
matchPattern := vs.sessionPrefix + "*"
184+
seenKeys := make(map[string]bool)
185+
186+
for {
187+
scanRes, err := vs.cli.Do(ctx, vs.cli.B().Scan().Cursor(cursor).Match(matchPattern).Count(100).Build()).AsScanEntry()
188+
if err != nil {
189+
return nil, fmt.Errorf("ListSandboxesByKind scan failed: %w", err)
190+
}
191+
192+
if len(scanRes.Elements) > 0 {
193+
sessionIDs := make([]string, 0, len(scanRes.Elements))
194+
for _, key := range scanRes.Elements {
195+
if key == vs.expiryIndexKey || key == vs.lastActivityIndexKey {
196+
continue
197+
}
198+
if seenKeys[key] {
199+
continue
200+
}
201+
seenKeys[key] = true
202+
sessionIDs = append(sessionIDs, strings.TrimPrefix(key, vs.sessionPrefix))
203+
}
204+
205+
if len(sessionIDs) > 0 {
206+
// Batch load the fetched keys
207+
sandboxes, err := vs.loadSandboxesBySessionIDs(ctx, sessionIDs)
208+
if err != nil {
209+
return nil, err
210+
}
211+
212+
// Filter by requested kind
213+
for _, sb := range sandboxes {
214+
if sb != nil && sb.Kind == kind {
215+
allSandboxes = append(allSandboxes, sb)
216+
}
217+
}
218+
}
219+
}
220+
221+
cursor = scanRes.Cursor
222+
if cursor == 0 {
223+
break // Scan complete
224+
}
225+
}
226+
return allSandboxes, nil
227+
}
228+
171229
// StoreSandbox store sandbox into storage
172230
func (vs *valkeyStore) StoreSandbox(ctx context.Context, sandboxStore *types.SandboxInfo) error {
173231
if sandboxStore == nil {

pkg/store/store_valkey_test.go

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ func newValkeyTestClient(t *testing.T) (*valkeyStore, *miniredis.Miniredis) {
135135
rs := &valkeyStore{
136136
cli: client,
137137
sessionPrefix: "session:",
138-
expiryIndexKey: "sandbox:expiry",
139-
lastActivityIndexKey: "sandbox:last_activity",
138+
expiryIndexKey: "session:expiry",
139+
lastActivityIndexKey: "session:last_activity",
140140
}
141141
return rs, mr
142142
}
@@ -360,3 +360,79 @@ func TestValkeyStore_UpdateSandboxLastActivity(t *testing.T) {
360360
assert.Error(t, err)
361361
assert.True(t, errors.Is(err, ErrNotFound))
362362
}
363+
364+
func TestValkeyStore_ListSandboxesByKind(t *testing.T) {
365+
ctx := context.Background()
366+
c, _ := newValkeyTestClient(t)
367+
368+
now := time.Now().UTC()
369+
sb1 := newTestSandbox("sb-1", "sess-1", now.Add(1*time.Hour))
370+
sb1.Kind = types.AgentRuntimeKind
371+
372+
sb2 := newTestSandbox("sb-2", "sess-2", now.Add(1*time.Hour))
373+
sb2.Kind = types.CodeInterpreterKind
374+
375+
sb3 := newTestSandbox("sb-3", "sess-3", now.Add(1*time.Hour))
376+
sb3.Kind = types.AgentRuntimeKind
377+
378+
assert.NoError(t, c.StoreSandbox(ctx, sb1))
379+
assert.NoError(t, c.StoreSandbox(ctx, sb2))
380+
assert.NoError(t, c.StoreSandbox(ctx, sb3))
381+
382+
// List AgentRuntimeKind
383+
list, err := c.ListSandboxesByKind(ctx, types.AgentRuntimeKind)
384+
assert.NoError(t, err)
385+
assert.Len(t, list, 2)
386+
387+
ids := map[string]bool{}
388+
for _, sb := range list {
389+
ids[sb.SessionID] = true
390+
}
391+
assert.True(t, ids["sess-1"])
392+
assert.True(t, ids["sess-3"])
393+
394+
// List CodeInterpreterKind
395+
list2, err := c.ListSandboxesByKind(ctx, types.CodeInterpreterKind)
396+
assert.NoError(t, err)
397+
assert.Len(t, list2, 1)
398+
assert.Equal(t, "sess-2", list2[0].SessionID)
399+
400+
// List unknown kind
401+
list3, err := c.ListSandboxesByKind(ctx, "unknown-kind")
402+
assert.NoError(t, err)
403+
assert.Len(t, list3, 0)
404+
}
405+
406+
// TestValkeyStore_LoadSandboxesBySessionIDs_OrphanedZSetEntry verifies that
407+
// loadSandboxesBySessionIDs skips session IDs whose hash key has been evicted
408+
// from Valkey (orphaned sorted-set entry) instead of aborting the entire batch.
409+
func TestValkeyStore_LoadSandboxesBySessionIDs_OrphanedZSetEntry(t *testing.T) {
410+
ctx := context.Background()
411+
c, mr := newValkeyTestClient(t)
412+
413+
now := time.Now().UTC().Truncate(time.Second)
414+
415+
sb1 := newTestSandbox("sb-orphan", "sess-orphan", now.Add(-1*time.Hour))
416+
sb2 := newTestSandbox("sb-alive", "sess-alive", now.Add(-2*time.Hour))
417+
418+
if err := c.StoreSandbox(ctx, sb1); err != nil {
419+
t.Fatalf("StoreSandbox sb1 error: %v", err)
420+
}
421+
if err := c.StoreSandbox(ctx, sb2); err != nil {
422+
t.Fatalf("StoreSandbox sb2 error: %v", err)
423+
}
424+
425+
// Simulate Valkey evicting the hash key for sb1 while leaving its zset entry.
426+
mr.Del(c.sessionKey("sess-orphan"))
427+
428+
result, err := c.loadSandboxesBySessionIDs(ctx, []string{"sess-orphan", "sess-alive"})
429+
if err != nil {
430+
t.Fatalf("expected no error with orphaned zset entry, got: %v", err)
431+
}
432+
if len(result) != 1 {
433+
t.Fatalf("expected 1 sandbox (the non-evicted one), got %d", len(result))
434+
}
435+
if result[0].SandboxID != "sb-alive" {
436+
t.Fatalf("expected sb-alive, got %s", result[0].SandboxID)
437+
}
438+
}

pkg/workloadmanager/auth_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
)
3232

3333
const (
34-
testToken = "test-token"
34+
testToken = "test-token"
3535
testServiceAccount = "system:serviceaccount:default:test-sa"
3636
)
3737

@@ -85,7 +85,7 @@ func TestAuthMiddleware_InvalidHeaderFormat(t *testing.T) {
8585
name: "no Bearer prefix",
8686
header: "token123",
8787
expectedBodyPart: "Invalid authorization header format",
88-
},
88+
},
8989
{
9090
name: "wrong prefix",
9191
header: "Basic token123",

pkg/workloadmanager/client_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
)
2929

3030
const (
31-
jwtHeader = `{"alg":"HS256","typ":"JWT"}`
31+
jwtHeader = `{"alg":"HS256","typ":"JWT"}`
3232
testCacheKey = "default:test-sa"
3333
)
3434

pkg/workloadmanager/garbage_collection_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ func (nopStore) DeleteSandboxBySessionID(_ context.Context, _ string) error { r
4646
func (nopStore) ListExpiredSandboxes(_ context.Context, _ time.Time, _ int64) ([]*types.SandboxInfo, error) {
4747
return nil, nil
4848
}
49+
func (nopStore) ListSandboxesByKind(_ context.Context, _ string) ([]*types.SandboxInfo, error) {
50+
return nil, nil
51+
}
4952
func (nopStore) ListInactiveSandboxes(_ context.Context, _ time.Time, _ int64) ([]*types.SandboxInfo, error) {
5053
return nil, nil
5154
}

0 commit comments

Comments
 (0)