-
Notifications
You must be signed in to change notification settings - Fork 252
fix: sync codex plan type on reset #142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,21 +38,22 @@ import ( | |
|
|
||
| // Handler 管理后台 API 处理器 | ||
| type Handler struct { | ||
| store *auth.Store | ||
| cache cache.TokenCache | ||
| db *database.DB | ||
| rateLimiter *proxy.RateLimiter | ||
| refreshAccount func(context.Context, int64) error | ||
| cpuSampler *cpuSampler | ||
| startedAt time.Time | ||
| pgMaxConns int | ||
| redisPoolSize int | ||
| databaseDriver string | ||
| databaseLabel string | ||
| cacheDriver string | ||
| cacheLabel string | ||
| adminSecretEnv string | ||
| imageProxy *proxy.Handler | ||
| store *auth.Store | ||
| cache cache.TokenCache | ||
| db *database.DB | ||
| rateLimiter *proxy.RateLimiter | ||
| refreshAccount func(context.Context, int64) error | ||
| syncAccountPlanOnReset func(context.Context, *auth.Account) error | ||
| cpuSampler *cpuSampler | ||
| startedAt time.Time | ||
| pgMaxConns int | ||
| redisPoolSize int | ||
| databaseDriver string | ||
| databaseLabel string | ||
| cacheDriver string | ||
| cacheLabel string | ||
| adminSecretEnv string | ||
| imageProxy *proxy.Handler | ||
|
|
||
| // 图表聚合内存缓存(10秒 TTL) | ||
| chartCacheMu sync.RWMutex | ||
|
|
@@ -170,6 +171,7 @@ func NewHandler(store *auth.Store, db *database.DB, tc cache.TokenCache, rl *pro | |
| handler.imageProxy.SetRuntimeCache(tc) | ||
| } | ||
| handler.refreshAccount = handler.refreshSingleAccount | ||
| handler.syncAccountPlanOnReset = handler.syncSingleAccountPlanOnReset | ||
| if db != nil { | ||
| if err := db.MarkInterruptedImageJobs(context.Background()); err != nil { | ||
| log.Printf("标记中断生图任务失败: %v", err) | ||
|
|
@@ -2453,6 +2455,7 @@ func (h *Handler) ResetAccountStatus(c *gin.Context) { | |
|
|
||
| h.store.ClearCooldown(acc) | ||
| acc.ClearUsageCache() | ||
| h.syncAccountPlanAfterReset(c.Request.Context(), acc) | ||
| writeMessage(c, http.StatusOK, "账号状态已重置") | ||
| } | ||
|
|
||
|
|
@@ -2476,6 +2479,7 @@ func (h *Handler) BatchResetStatus(c *gin.Context) { | |
| } | ||
| h.store.ClearCooldown(acc) | ||
| acc.ClearUsageCache() | ||
| h.syncAccountPlanAfterReset(c.Request.Context(), acc) | ||
| success++ | ||
| } | ||
|
|
||
|
|
@@ -2486,6 +2490,38 @@ func (h *Handler) BatchResetStatus(c *gin.Context) { | |
| }) | ||
| } | ||
|
|
||
| func (h *Handler) syncAccountPlanAfterReset(parent context.Context, acc *auth.Account) { | ||
| if h == nil || h.syncAccountPlanOnReset == nil || acc == nil { | ||
| return | ||
| } | ||
| if parent == nil { | ||
| parent = context.Background() | ||
| } | ||
| ctx, cancel := context.WithTimeout(parent, 15*time.Second) | ||
| defer cancel() | ||
| if err := h.syncAccountPlanOnReset(ctx, acc); err != nil { | ||
| log.Printf("[account %d] sync Codex plan type after reset failed: %v", acc.DBID, err) | ||
| } | ||
| } | ||
|
|
||
| func (h *Handler) syncSingleAccountPlanOnReset(ctx context.Context, acc *auth.Account) error { | ||
| if h == nil || h.store == nil || acc == nil || acc.IsOpenAIResponsesAPI() || acc.GetAccessToken() == "" { | ||
| return nil | ||
| } | ||
|
Comment on lines
+2508
to
+2510
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reset-time plan sync is skipped for accounts that most need it. The early return on empty 🤖 Prompt for AI Agents |
||
| model, err := h.connectionTestModelForAccount(ctx, acc, "") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| resp, err := proxy.ExecuteRequest(ctx, acc, buildTestPayload(model), "", h.store.ResolveProxyForAccount(acc), "", nil, nil) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer resp.Body.Close() | ||
| _, _ = io.Copy(io.Discard, resp.Body) | ||
| proxy.SyncCodexUsageState(h.store, acc, resp) | ||
| return nil | ||
| } | ||
|
|
||
| func (h *Handler) refreshSingleAccount(ctx context.Context, id int64) error { | ||
| if h == nil || h.store == nil { | ||
| return fmt.Errorf("账号池未初始化") | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3559,6 +3559,39 @@ func (s *Store) PersistUsageSnapshot(acc *Account, pct7d float64) { | |||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| // UpdateAccountPlanType persists the latest Codex plan type observed from upstream headers. | ||||||||||||||||||||||||||||||||||||||||
| func (s *Store) UpdateAccountPlanType(acc *Account, planType string) bool { | ||||||||||||||||||||||||||||||||||||||||
| if s == nil || acc == nil { | ||||||||||||||||||||||||||||||||||||||||
| return false | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| plan := strings.ToLower(strings.TrimSpace(planType)) | ||||||||||||||||||||||||||||||||||||||||
| if plan == "" { | ||||||||||||||||||||||||||||||||||||||||
| return false | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| acc.mu.Lock() | ||||||||||||||||||||||||||||||||||||||||
| changed := acc.PlanType != plan | ||||||||||||||||||||||||||||||||||||||||
| if changed { | ||||||||||||||||||||||||||||||||||||||||
| acc.PlanType = plan | ||||||||||||||||||||||||||||||||||||||||
| acc.recomputeSchedulerLocked(atomic.LoadInt64(&s.maxConcurrency)) | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| acc.mu.Unlock() | ||||||||||||||||||||||||||||||||||||||||
| if changed { | ||||||||||||||||||||||||||||||||||||||||
| s.fastSchedulerUpdate(acc) | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| if s.db == nil || !changed { | ||||||||||||||||||||||||||||||||||||||||
| return changed | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | ||||||||||||||||||||||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||||||||||||||||||||||
| if err := s.db.UpdateCredentials(ctx, acc.DBID, map[string]interface{}{"plan_type": plan}); err != nil { | ||||||||||||||||||||||||||||||||||||||||
| log.Printf("[璐﹀彿 %d] 鎸佷箙鍖?plan_type 澶辫触: %v", acc.DBID, err) | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+3583
to
+3591
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Persist the observed plan even when memory already matches it. This method currently skips Suggested fix- if s.db == nil || !changed {
+ if s.db == nil {
return changed
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := s.db.UpdateCredentials(ctx, acc.DBID, map[string]interface{}{"plan_type": plan}); err != nil {
log.Printf("[璐﹀彿 %d] 鎸佷箙鍖?plan_type 澶辫触: %v", acc.DBID, err)
}
return changed📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||
| return changed | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| // ApplyUsageLimitMetadata applies metadata returned by Codex usage_limit_reached errors. | ||||||||||||||||||||||||||||||||||||||||
| func (s *Store) ApplyUsageLimitMetadata(acc *Account, planType string, resetAt time.Time) { | ||||||||||||||||||||||||||||||||||||||||
| if acc == nil { | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch reset now serially waits on one upstream probe per account.
syncAccountPlanAfterResetcan block for up to 15 seconds, and this loop invokes it inline for every ID. That makesbatch-reset-statuslatency scale with account count, so a modest batch can exceed proxy/request timeouts even though the actual reset work already succeeded. This should be decoupled from the response path or run through a bounded async worker pool.