Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/common/types/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type SandboxInfo struct {
// during ListInactiveSandboxes. It is intentionally excluded from JSON serialization.
LastActivityAt time.Time `json:"-"`
Status string `json:"status"`
// CreatedBy holds the Kubernetes service account name (without the namespace prefix)
// that created this sandbox. It is used by the GET handlers to enforce per-service-account
// ownership: only the creating SA can retrieve its own session entrypoints.
CreatedBy string `json:"createdBy,omitempty"`
}

type SandboxEntryPoint struct {
Expand Down
11 changes: 10 additions & 1 deletion pkg/router/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,16 @@ func TestGetPrivateKeyPEM(t *testing.T) {
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
assert.NoError(t, err)
assert.NotNil(t, privateKey)
assert.Equal(t, manager.privateKey, privateKey)
// Compare keys mathematically instead of by object equality
assert.Equal(t, manager.privateKey.PublicKey.N, privateKey.PublicKey.N, "Public key N should match")
assert.Equal(t, manager.privateKey.PublicKey.E, privateKey.PublicKey.E, "Public key E should match")
assert.Equal(t, 0, manager.privateKey.D.Cmp(privateKey.D), "Private exponent D should match")

// Compare primes
assert.Equal(t, len(manager.privateKey.Primes), len(privateKey.Primes), "Number of primes should match")
for i := range manager.privateKey.Primes {
assert.Equal(t, 0, manager.privateKey.Primes[i].Cmp(privateKey.Primes[i]), "Prime %d should match", i)
}
}

func TestLoadPrivateKeyPEM(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/router/session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (f *fakeStoreClient) ListExpiredSandboxes(_ context.Context, _ time.Time, _
return nil, nil
}

func (f *fakeStoreClient) ListSandboxesByKind(_ context.Context, _ string) ([]*types.SandboxInfo, error) {
return nil, nil
}

func (f *fakeStoreClient) ListInactiveSandboxes(_ context.Context, _ time.Time, _ int64) ([]*types.SandboxInfo, error) {
return nil, nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/store/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Store interface {
UpdateSandbox(ctx context.Context, sandboxStore *types.SandboxInfo) error
// DeleteSandboxBySessionID delete sandbox by session ID
DeleteSandboxBySessionID(ctx context.Context, sessionID string) error
// ListSandboxesByKind returns all active sandboxes matching the given kind.
ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error)
// ListExpiredSandboxes returns up to limit sandboxes with ExpiresAt before the given time
ListExpiredSandboxes(ctx context.Context, before time.Time, limit int64) ([]*types.SandboxInfo, error)
// ListInactiveSandboxes returns up to limit sandboxes with last-activity time before the given time
Expand Down
55 changes: 55 additions & 0 deletions pkg/store/store_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,61 @@ func (rs *redisStore) GetSandboxBySessionID(ctx context.Context, sessionID strin
return &sandboxRedis, nil
}

// ListSandboxesByKind returns all active sandboxes matching the given kind.
// Uses SCAN to prevent blocking the Redis instance on large datasets.
func (rs *redisStore) ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error) {
allSandboxes := make([]*types.SandboxInfo, 0) // Initialize as empty array, not nil
var cursor uint64
matchPattern := rs.sessionPrefix + "*"
// Note: seenKeys holds all scanned keys in memory to deduplicate results across all SCAN pages,
// as SCAN can return the same key multiple times. For extremely large datasets, this could
// have a memory impact, and a dedicated secondary index (e.g. Redis SET per kind) is recommended
// as a follow-up optimization.
seenKeys := make(map[string]bool)
Comment thread
Abhinav-kodes marked this conversation as resolved.

for {
keys, nextCursor, err := rs.cli.Scan(ctx, cursor, matchPattern, 100).Result()
if err != nil {
return nil, fmt.Errorf("ListSandboxesByKind scan failed: %w", err)
}

if len(keys) > 0 {
sessionIDs := make([]string, 0, len(keys))
for _, key := range keys {
if key == rs.expiryIndexKey || key == rs.lastActivityIndexKey {
continue
}
if seenKeys[key] {
continue
}
seenKeys[key] = true
sessionIDs = append(sessionIDs, strings.TrimPrefix(key, rs.sessionPrefix))
}

if len(sessionIDs) > 0 {
// Batch load the fetched keys
sandboxes, err := rs.loadSandboxesBySessionIDs(ctx, sessionIDs)
if err != nil {
return nil, err
}

// Filter by requested kind
for _, sb := range sandboxes {
if sb.Kind == kind {
allSandboxes = append(allSandboxes, sb)
}
}
}
}

cursor = nextCursor
if cursor == 0 {
break // Scan complete
}
}
return allSandboxes, nil
}
Comment thread
Abhinav-kodes marked this conversation as resolved.
Comment thread
Abhinav-kodes marked this conversation as resolved.
Comment thread
Abhinav-kodes marked this conversation as resolved.

func (rs *redisStore) StoreSandbox(ctx context.Context, sandboxRedis *types.SandboxInfo) error {
if sandboxRedis == nil {
return errors.New("StoreSandbox: sandbox is nil")
Expand Down
46 changes: 44 additions & 2 deletions pkg/store/store_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func newTestRedisClient(t *testing.T) (*redisStore, *miniredis.Miniredis) {
rs := &redisStore{
cli: redisv9.NewClient(&redisv9.Options{Addr: mr.Addr()}),
sessionPrefix: "session:",
expiryIndexKey: "sandbox:expiry",
lastActivityIndexKey: "sandbox:last_activity",
expiryIndexKey: "session:expiry",
lastActivityIndexKey: "session:last_activity",
}
return rs, mr
}
Expand Down Expand Up @@ -344,3 +344,45 @@ func TestUpdateSandboxLastActivity(t *testing.T) {
t.Fatalf("unexpected lastActivity score after update: got %v, want %v", score, newLastActivity.Unix())
}
}

func TestRedisStore_ListSandboxesByKind(t *testing.T) {
ctx := context.Background()
c, _ := newTestRedisClient(t)

now := time.Now().UTC()
Comment thread
Abhinav-kodes marked this conversation as resolved.
sb1 := newTestSandbox("sb-1", "sess-1", now.Add(1*time.Hour))
sb1.Kind = types.AgentRuntimeKind

sb2 := newTestSandbox("sb-2", "sess-2", now.Add(1*time.Hour))
sb2.Kind = types.CodeInterpreterKind

sb3 := newTestSandbox("sb-3", "sess-3", now.Add(1*time.Hour))
sb3.Kind = types.AgentRuntimeKind

assert.NoError(t, c.StoreSandbox(ctx, sb1))
assert.NoError(t, c.StoreSandbox(ctx, sb2))
assert.NoError(t, c.StoreSandbox(ctx, sb3))

// List AgentRuntimeKind
list, err := c.ListSandboxesByKind(ctx, types.AgentRuntimeKind)
assert.NoError(t, err)
assert.Len(t, list, 2)

ids := map[string]bool{}
for _, sb := range list {
ids[sb.SessionID] = true
}
assert.True(t, ids["sess-1"])
assert.True(t, ids["sess-3"])

// List CodeInterpreterKind
list2, err := c.ListSandboxesByKind(ctx, types.CodeInterpreterKind)
assert.NoError(t, err)
assert.Len(t, list2, 1)
assert.Equal(t, "sess-2", list2[0].SessionID)

// List unknown kind
list3, err := c.ListSandboxesByKind(ctx, "unknown-kind")
assert.NoError(t, err)
assert.Len(t, list3, 0)
}
74 changes: 68 additions & 6 deletions pkg/store/store_valkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,26 @@ func (vs *valkeyStore) loadSandboxesBySessionIDs(ctx context.Context, sessionIDs
sessionIDKeys = append(sessionIDKeys, vs.sessionKey(sessionID))
}
// MGet should in same slot
stingSliceResults, err := vs.cli.Do(ctx, vs.cli.B().Mget().Key(sessionIDKeys...).Build()).AsStrSlice()
valkeyResults, err := vs.cli.Do(ctx, vs.cli.B().Mget().Key(sessionIDKeys...).Build()).ToArray()
Comment thread
Abhinav-kodes marked this conversation as resolved.
if err != nil {
return nil, fmt.Errorf("loadSandboxesBySessionIDs: Valkey MGet sandboxes failed: %w", err)
}

if len(stingSliceResults) > len(sessionIDKeys) {
return nil, fmt.Errorf("unexpected MGet result size: %d, param size: %d", len(stingSliceResults), len(sessionIDKeys))
if len(valkeyResults) != len(sessionIDKeys) {
return nil, fmt.Errorf("unexpected MGet result size: %d, expected: %d", len(valkeyResults), len(sessionIDKeys))
}

sandboxResults := make([]*types.SandboxInfo, 0, len(stingSliceResults))
for i, sandboxObjString := range stingSliceResults {
sandboxResults := make([]*types.SandboxInfo, 0, len(valkeyResults))
for i, msg := range valkeyResults {
if msg.IsNil() {
// key does not exist, ignore
continue
}
sandboxObjString, err := msg.ToString()
if err != nil {
return nil, fmt.Errorf("parse sandbox string failed: %w, index: %v, sessionID: %v", err, i, sessionIDs[i])
}
if len(sandboxObjString) == 0 {
// sandboxObjString is empty while sessionKey not exist, ignore
continue
}
var sandboxRedis types.SandboxInfo
Expand Down Expand Up @@ -168,6 +175,61 @@ func (vs *valkeyStore) GetSandboxBySessionID(ctx context.Context, sessionID stri
return &sandboxRedis, nil
}

// ListSandboxesByKind returns all active sandboxes matching the given kind.
// Uses SCAN to prevent blocking the Valkey instance on large datasets.
func (vs *valkeyStore) ListSandboxesByKind(ctx context.Context, kind string) ([]*types.SandboxInfo, error) {
allSandboxes := make([]*types.SandboxInfo, 0) // Initialize as empty array, not nil
cursor := uint64(0)
matchPattern := vs.sessionPrefix + "*"
// Note: seenKeys holds all scanned keys in memory to deduplicate results across all SCAN pages,
// as SCAN can return the same key multiple times. For extremely large datasets, this could
// have a memory impact, and a dedicated secondary index (e.g. Valkey SET per kind) is recommended
// as a follow-up optimization.
seenKeys := make(map[string]bool)

for {
scanRes, err := vs.cli.Do(ctx, vs.cli.B().Scan().Cursor(cursor).Match(matchPattern).Count(100).Build()).AsScanEntry()
if err != nil {
return nil, fmt.Errorf("ListSandboxesByKind scan failed: %w", err)
}

if len(scanRes.Elements) > 0 {
sessionIDs := make([]string, 0, len(scanRes.Elements))
for _, key := range scanRes.Elements {
if key == vs.expiryIndexKey || key == vs.lastActivityIndexKey {
continue
}
if seenKeys[key] {
continue
}
seenKeys[key] = true
sessionIDs = append(sessionIDs, strings.TrimPrefix(key, vs.sessionPrefix))
}

if len(sessionIDs) > 0 {
// Batch load the fetched keys
sandboxes, err := vs.loadSandboxesBySessionIDs(ctx, sessionIDs)
if err != nil {
return nil, err
}

// Filter by requested kind
for _, sb := range sandboxes {
if sb.Kind == kind {
allSandboxes = append(allSandboxes, sb)
}
}
}
}

cursor = scanRes.Cursor
if cursor == 0 {
break // Scan complete
}
}
return allSandboxes, nil
}
Comment thread
Abhinav-kodes marked this conversation as resolved.
Comment thread
Abhinav-kodes marked this conversation as resolved.
Comment thread
Abhinav-kodes marked this conversation as resolved.

// StoreSandbox store sandbox into storage
func (vs *valkeyStore) StoreSandbox(ctx context.Context, sandboxStore *types.SandboxInfo) error {
if sandboxStore == nil {
Expand Down
80 changes: 78 additions & 2 deletions pkg/store/store_valkey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func newValkeyTestClient(t *testing.T) (*valkeyStore, *miniredis.Miniredis) {
rs := &valkeyStore{
cli: client,
sessionPrefix: "session:",
expiryIndexKey: "sandbox:expiry",
lastActivityIndexKey: "sandbox:last_activity",
expiryIndexKey: "session:expiry",
lastActivityIndexKey: "session:last_activity",
}
return rs, mr
}
Expand Down Expand Up @@ -360,3 +360,79 @@ func TestValkeyStore_UpdateSandboxLastActivity(t *testing.T) {
assert.Error(t, err)
assert.True(t, errors.Is(err, ErrNotFound))
}

func TestValkeyStore_ListSandboxesByKind(t *testing.T) {
ctx := context.Background()
c, _ := newValkeyTestClient(t)

now := time.Now().UTC()
Comment thread
Abhinav-kodes marked this conversation as resolved.
sb1 := newTestSandbox("sb-1", "sess-1", now.Add(1*time.Hour))
sb1.Kind = types.AgentRuntimeKind

sb2 := newTestSandbox("sb-2", "sess-2", now.Add(1*time.Hour))
sb2.Kind = types.CodeInterpreterKind

sb3 := newTestSandbox("sb-3", "sess-3", now.Add(1*time.Hour))
sb3.Kind = types.AgentRuntimeKind

assert.NoError(t, c.StoreSandbox(ctx, sb1))
assert.NoError(t, c.StoreSandbox(ctx, sb2))
assert.NoError(t, c.StoreSandbox(ctx, sb3))

// List AgentRuntimeKind
list, err := c.ListSandboxesByKind(ctx, types.AgentRuntimeKind)
assert.NoError(t, err)
assert.Len(t, list, 2)

ids := map[string]bool{}
for _, sb := range list {
ids[sb.SessionID] = true
}
assert.True(t, ids["sess-1"])
assert.True(t, ids["sess-3"])

// List CodeInterpreterKind
list2, err := c.ListSandboxesByKind(ctx, types.CodeInterpreterKind)
assert.NoError(t, err)
assert.Len(t, list2, 1)
assert.Equal(t, "sess-2", list2[0].SessionID)

// List unknown kind
list3, err := c.ListSandboxesByKind(ctx, "unknown-kind")
assert.NoError(t, err)
assert.Len(t, list3, 0)
}

// TestValkeyStore_LoadSandboxesBySessionIDs_OrphanedZSetEntry verifies that
// loadSandboxesBySessionIDs skips session IDs whose hash key has been evicted
// from Valkey (orphaned sorted-set entry) instead of aborting the entire batch.
func TestValkeyStore_LoadSandboxesBySessionIDs_OrphanedZSetEntry(t *testing.T) {
ctx := context.Background()
c, mr := newValkeyTestClient(t)

now := time.Now().UTC().Truncate(time.Second)

sb1 := newTestSandbox("sb-orphan", "sess-orphan", now.Add(-1*time.Hour))
sb2 := newTestSandbox("sb-alive", "sess-alive", now.Add(-2*time.Hour))

if err := c.StoreSandbox(ctx, sb1); err != nil {
t.Fatalf("StoreSandbox sb1 error: %v", err)
}
if err := c.StoreSandbox(ctx, sb2); err != nil {
t.Fatalf("StoreSandbox sb2 error: %v", err)
}

// Simulate Valkey evicting the hash key for sb1 while leaving its zset entry.
mr.Del(c.sessionKey("sess-orphan"))

result, err := c.loadSandboxesBySessionIDs(ctx, []string{"sess-orphan", "sess-alive"})
if err != nil {
t.Fatalf("expected no error with orphaned zset entry, got: %v", err)
}
if len(result) != 1 {
t.Fatalf("expected 1 sandbox (the non-evicted one), got %d", len(result))
}
if result[0].SandboxID != "sb-alive" {
t.Fatalf("expected sb-alive, got %s", result[0].SandboxID)
}
}
16 changes: 11 additions & 5 deletions pkg/workloadmanager/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,24 @@ import (
//
// POST /v1/sandboxes (CreateSandbox):
// - Any authenticated user can create sandboxes
// - The creator's service account name is stored in the sandbox metadata
// - The creator's service account name is stamped into SandboxInfo.CreatedBy
// and persisted in the store at creation time
//
// GET /v1/sandboxes (ListSandboxes):
// - Users can only list sandboxes they created
// - Users can only list sandboxes they created (CreatedBy == serviceAccountName)
// - For backward compatibility, sandboxes without a CreatedBy field are filtered
// by namespace only (entries created before this field was introduced)
//
// GET /v1/sandboxes/{sandboxId} (GetSandbox):
// - Users can only access sandboxes they created
// - Access is checked via checkSandboxAccess() function
// - Users can only access sandboxes they created (CreatedBy == serviceAccountName)
// - Access is first checked by namespace, then by CreatedBy
// - Returns 404 (not 403) for unauthorized or missing sessions to prevent
// session ID enumeration attacks
//
// DELETE /v1/sandboxes/{sandboxId} (DeleteSandbox):
// - Users can only delete sandboxes they created
// - Access is checked via checkSandboxAccess() function
// - Access is enforced by Kubernetes RBAC via the user's own dynamicClient;
// the Kubernetes API itself rejects cross-account deletes
//
// CONNECT /v1/sandboxes/{sandboxId} (Tunnel):
// - Users can only establish tunnels to sandboxes they created
Expand Down
Loading
Loading