Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,7 @@ func TestIdempotencyRepo_StatusTransition_ToSucceeded(t *testing.T) {
require.Equal(t, `{"ok":true}`, *got.ResponseBody)
require.Nil(t, got.LockedUntil)
}

func ptrTime(t time.Time) *time.Time {
return &t
}
142 changes: 107 additions & 35 deletions backend/internal/repository/scheduler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

"github.com/Wei-Shaw/sub2api/internal/service"
Expand All @@ -16,6 +17,7 @@ const (
schedulerOutboxWatermarkKey = "sched:outbox:watermark"
schedulerAccountPrefix = "sched:acc:"
schedulerAccountMetaPrefix = "sched:meta:"
schedulerAccountLastUsedKey = "sched:acc:last_used:"
schedulerActivePrefix = "sched:active:"
schedulerReadyPrefix = "sched:ready:"
schedulerVersionPrefix = "sched:ver:"
Expand Down Expand Up @@ -84,23 +86,33 @@ func (c *schedulerCache) GetSnapshot(ctx context.Context, bucket service.Schedul
}

keys := make([]string, 0, len(ids))
lastUsedKeys := make([]string, 0, len(ids))
for _, id := range ids {
keys = append(keys, schedulerAccountMetaKey(id))
lastUsedKeys = append(lastUsedKeys, schedulerLastUsedKey(id))
}
values, err := c.mgetChunked(ctx, keys)
if err != nil {
return nil, false, err
}
lastUsedValues, err := c.mgetChunked(ctx, lastUsedKeys)
if err != nil {
return nil, false, err
}

accounts := make([]*service.Account, 0, len(values))
for _, val := range values {
for i, val := range values {
if val == nil {
return nil, false, nil
}
account, err := decodeCachedAccount(val)
if err != nil {
return nil, false, err
}
// LastUsedAt 是高频热字段,读取时优先使用 side key 覆盖。
if err := applySchedulerLastUsed(account, lastUsedValues[i]); err != nil {
return nil, false, err
}
accounts = append(accounts, account)
}

Expand Down Expand Up @@ -159,15 +171,23 @@ func (c *schedulerCache) SetSnapshot(ctx context.Context, bucket service.Schedul
}

func (c *schedulerCache) GetAccount(ctx context.Context, accountID int64) (*service.Account, error) {
key := schedulerAccountKey(strconv.FormatInt(accountID, 10))
id := strconv.FormatInt(accountID, 10)
key := schedulerAccountKey(id)
val, err := c.rdb.Get(ctx, key).Result()
if err == redis.Nil {
return nil, nil
}
if err != nil {
return nil, err
}
return decodeCachedAccount(val)
account, err := decodeCachedAccount(val)
if err != nil {
return nil, err
}
if err := c.applySchedulerLastUsedFromKey(ctx, account, schedulerLastUsedKey(id)); err != nil {
return nil, err
}
return account, nil
}

func (c *schedulerCache) SetAccount(ctx context.Context, account *service.Account) error {
Expand All @@ -182,48 +202,33 @@ func (c *schedulerCache) DeleteAccount(ctx context.Context, accountID int64) err
return nil
}
id := strconv.FormatInt(accountID, 10)
return c.rdb.Del(ctx, schedulerAccountKey(id), schedulerAccountMetaKey(id)).Err()
return c.rdb.Del(ctx, schedulerAccountKey(id), schedulerAccountMetaKey(id), schedulerLastUsedKey(id)).Err()
}

func (c *schedulerCache) UpdateLastUsed(ctx context.Context, updates map[int64]time.Time) error {
if len(updates) == 0 {
return nil
}

keys := make([]string, 0, len(updates))
ids := make([]int64, 0, len(updates))
for id := range updates {
keys = append(keys, schedulerAccountKey(strconv.FormatInt(id, 10)))
ids = append(ids, id)
}

values, err := c.mgetChunked(ctx, keys)
if err != nil {
return err
}

pipe := c.rdb.Pipeline()
for i, val := range values {
if val == nil {
queuedCommands := 0
for id, usedAt := range updates {
if id <= 0 {
continue
}
account, err := decodeCachedAccount(val)
if err != nil {
return err
}
account.LastUsedAt = ptrTime(updates[ids[i]])
updated, err := json.Marshal(account)
if err != nil {
return err
}
metaPayload, err := json.Marshal(buildSchedulerMetadataAccount(*account))
if err != nil {
return err
key := schedulerLastUsedKey(strconv.FormatInt(id, 10))
// 热路径只更新 side key,避免反序列化并重写整块账号 JSON。
if usedAt.IsZero() {
pipe.Del(ctx, key)
} else {
pipe.Set(ctx, key, strconv.FormatInt(usedAt.UTC().UnixNano(), 10), 0)
}
pipe.Set(ctx, keys[i], updated, 0)
pipe.Set(ctx, schedulerAccountMetaKey(strconv.FormatInt(ids[i], 10)), metaPayload, 0)
queuedCommands++
}
if queuedCommands == 0 {
return nil
}
_, err = pipe.Exec(ctx)
_, err := pipe.Exec(ctx)
return err
}

Expand Down Expand Up @@ -283,8 +288,74 @@ func schedulerAccountMetaKey(id string) string {
return schedulerAccountMetaPrefix + id
}

func ptrTime(t time.Time) *time.Time {
return &t
func schedulerLastUsedKey(id string) string {
return schedulerAccountLastUsedKey + id
}

func (c *schedulerCache) applySchedulerLastUsedFromKey(ctx context.Context, account *service.Account, key string) error {
val, err := c.rdb.Get(ctx, key).Result()
switch {
case err == redis.Nil:
return nil
case err != nil:
return err
}
return applySchedulerLastUsed(account, val)
}

func applySchedulerLastUsed(account *service.Account, val any) error {
if account == nil || val == nil {
return nil
}
lastUsedAt, err := decodeSchedulerLastUsed(val)
if err != nil {
return err
}
if lastUsedAt == nil {
return nil
}
account.LastUsedAt = lastUsedAt
return nil
}

func decodeSchedulerLastUsed(val any) (*time.Time, error) {
var raw string
switch typed := val.(type) {
case string:
raw = typed
case []byte:
raw = string(typed)
default:
return nil, fmt.Errorf("unexpected last_used cache type: %T", val)
}

raw = strings.TrimSpace(raw)
if raw == "" {
return nil, nil
}
nanos, err := strconv.ParseInt(raw, 10, 64)
if err == nil {
parsed := time.Unix(0, nanos).UTC()
if nanos > -1000000000000 && nanos < 1000000000000 {
parsed = time.Unix(nanos, 0).UTC()
}
return &parsed, nil
}
parsed, err := time.Parse(time.RFC3339Nano, raw)
if err == nil {
utc := parsed.UTC()
return &utc, nil
}
return nil, fmt.Errorf("invalid last_used cache value: %q", raw)
}

func writeSchedulerLastUsed(pipe redis.Pipeliner, ctx context.Context, id string, lastUsedAt *time.Time) {
key := schedulerLastUsedKey(id)
if lastUsedAt == nil {
pipe.Del(ctx, key)
return
}
pipe.Set(ctx, key, strconv.FormatInt(lastUsedAt.UTC().UnixNano(), 10), 0)
}

func decodeCachedAccount(val any) (*service.Account, error) {
Expand Down Expand Up @@ -336,6 +407,7 @@ func (c *schedulerCache) writeAccounts(ctx context.Context, accounts []service.A
id := strconv.FormatInt(account.ID, 10)
pipe.Set(ctx, schedulerAccountKey(id), fullPayload, 0)
pipe.Set(ctx, schedulerAccountMetaKey(id), metaPayload, 0)
writeSchedulerLastUsed(pipe, ctx, id, account.LastUsedAt)
pending++
if pending >= c.writeChunkSize {
if err := flush(); err != nil {
Expand Down
71 changes: 71 additions & 0 deletions backend/internal/repository/scheduler_cache_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package repository

import (
"context"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -86,3 +87,73 @@ func TestSchedulerCacheSnapshotUsesSlimMetadataButKeepsFullAccount(t *testing.T)
require.Equal(t, "secret-access-token", full.GetCredential("access_token"))
require.Equal(t, strings.Repeat("x", 4096), full.GetCredential("huge_blob"))
}

func TestSchedulerCacheUpdateLastUsedUsesSideKeyAndKeepsAccountJSON(t *testing.T) {
ctx := context.Background()
rdb := testRedis(t)
cache := NewSchedulerCache(rdb)

initialUsedAt := time.Now().UTC().Truncate(time.Second)
account := &service.Account{
ID: 202,
Name: "hot-field-account",
Platform: service.PlatformOpenAI,
Type: service.AccountTypeAPIKey,
Status: service.StatusActive,
Schedulable: true,
Concurrency: 2,
LastUsedAt: &initialUsedAt,
Credentials: map[string]any{
"api_key": "k-1",
"huge_blob": strings.Repeat("a", 2048),
},
Extra: map[string]any{
"quota_limit": 100,
"notes_blob": strings.Repeat("b", 2048),
},
}
require.NoError(t, cache.SetAccount(ctx, account))

id := strconv.FormatInt(account.ID, 10)
accountBefore, err := rdb.Get(ctx, schedulerAccountKey(id)).Result()
require.NoError(t, err)
metaBefore, err := rdb.Get(ctx, schedulerAccountMetaKey(id)).Result()
require.NoError(t, err)

latestUsedAt := initialUsedAt.Add(37 * time.Second)
require.NoError(t, cache.UpdateLastUsed(ctx, map[int64]time.Time{
account.ID: latestUsedAt,
}))

accountAfter, err := rdb.Get(ctx, schedulerAccountKey(id)).Result()
require.NoError(t, err)
metaAfter, err := rdb.Get(ctx, schedulerAccountMetaKey(id)).Result()
require.NoError(t, err)
require.Equal(t, accountBefore, accountAfter, "更新 LastUsedAt 不应重写完整账号 JSON")
require.Equal(t, metaBefore, metaAfter, "更新 LastUsedAt 不应重写快照元数据 JSON")

lastUsedRaw, err := rdb.Get(ctx, schedulerLastUsedKey(id)).Result()
require.NoError(t, err)
require.Equal(t, strconv.FormatInt(latestUsedAt.UTC().UnixNano(), 10), lastUsedRaw)

cached, err := cache.GetAccount(ctx, account.ID)
require.NoError(t, err)
require.NotNil(t, cached)
require.NotNil(t, cached.LastUsedAt)
require.WithinDuration(t, latestUsedAt.UTC(), *cached.LastUsedAt, time.Nanosecond)

bucket := service.SchedulerBucket{GroupID: 9, Platform: service.PlatformOpenAI, Mode: service.SchedulerModeSingle}
require.NoError(t, cache.SetSnapshot(ctx, bucket, []service.Account{*account}))

latestForSnapshot := latestUsedAt.Add(13 * time.Second)
require.NoError(t, cache.UpdateLastUsed(ctx, map[int64]time.Time{
account.ID: latestForSnapshot,
}))

snapshot, hit, err := cache.GetSnapshot(ctx, bucket)
require.NoError(t, err)
require.True(t, hit)
require.Len(t, snapshot, 1)
require.NotNil(t, snapshot[0].LastUsedAt)
require.WithinDuration(t, latestForSnapshot.UTC(), *snapshot[0].LastUsedAt, time.Nanosecond)
}
Loading