From 50dae3e7146aea255f8099ec98511d1321b69122 Mon Sep 17 00:00:00 2001 From: Qi HU Date: Wed, 6 May 2026 14:14:49 +0800 Subject: [PATCH 1/2] feat: add account usage statistics view --- .../internal/handler/admin/account_handler.go | 43 ++ backend/internal/repository/usage_log_repo.go | 66 ++ backend/internal/server/api_contract_test.go | 8 + backend/internal/server/routes/admin.go | 1 + .../internal/service/account_usage_service.go | 25 + frontend/src/api/admin/accounts.ts | 29 +- frontend/src/components/layout/AppSidebar.vue | 1 + frontend/src/i18n/locales/en.ts | 46 ++ frontend/src/i18n/locales/zh.ts | 46 ++ frontend/src/router/index.ts | 12 + frontend/src/views/admin/AccountStatsView.vue | 607 ++++++++++++++++++ 11 files changed, 883 insertions(+), 1 deletion(-) create mode 100644 frontend/src/views/admin/AccountStatsView.vue diff --git a/backend/internal/handler/admin/account_handler.go b/backend/internal/handler/admin/account_handler.go index ffab74d6a7a..d38156dee2b 100644 --- a/backend/internal/handler/admin/account_handler.go +++ b/backend/internal/handler/admin/account_handler.go @@ -1667,6 +1667,49 @@ func (h *AccountHandler) GetUsage(c *gin.Context) { response.Success(c, usage) } +// GetRecentUsers handles getting recent users of an account +// GET /api/v1/admin/accounts/:id/recent-users +func (h *AccountHandler) GetRecentUsers(c *gin.Context) { + accountID, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + response.BadRequest(c, "Invalid account ID") + return + } + + var users []service.RecentAccountUser + startDateStr := strings.TrimSpace(c.Query("start_date")) + endDateStr := strings.TrimSpace(c.Query("end_date")) + if startDateStr != "" || endDateStr != "" { + if startDateStr == "" || endDateStr == "" { + response.BadRequest(c, "start_date and end_date are required together") + return + } + userTZ := c.Query("timezone") + startTime, parseErr := timezone.ParseInUserLocation("2006-01-02", startDateStr, userTZ) + if parseErr != nil { + response.BadRequest(c, "Invalid start_date format, use YYYY-MM-DD") + return + } + endTime, parseErr := timezone.ParseInUserLocation("2006-01-02", endDateStr, userTZ) + if parseErr != nil { + response.BadRequest(c, "Invalid end_date format, use YYYY-MM-DD") + return + } + users, err = h.accountUsageService.GetAccountUsersByTimeRange(c.Request.Context(), accountID, startTime, endTime.AddDate(0, 0, 1)) + } else { + users, err = h.accountUsageService.GetRecentAccountUsers(c.Request.Context(), accountID, 5) + } + if err != nil { + response.ErrorFrom(c, err) + return + } + if users == nil { + users = []service.RecentAccountUser{} + } + + response.Success(c, gin.H{"users": users}) +} + // ClearRateLimit handles clearing account rate limit status // POST /api/v1/admin/accounts/:id/clear-rate-limit func (h *AccountHandler) ClearRateLimit(c *gin.Context) { diff --git a/backend/internal/repository/usage_log_repo.go b/backend/internal/repository/usage_log_repo.go index f2fb87da33e..06e9ce260e7 100644 --- a/backend/internal/repository/usage_log_repo.go +++ b/backend/internal/repository/usage_log_repo.go @@ -4392,3 +4392,69 @@ func setToSlice(set map[int64]struct{}) []int64 { } return out } + +// GetRecentAccountUsers returns users who used the account in the last N minutes +func (r *usageLogRepository) GetRecentAccountUsers(ctx context.Context, accountID int64, minutes int) ([]service.RecentAccountUser, error) { + query := ` + SELECT + ul.user_id, + COALESCE(u.email, '') as email, + COUNT(*) as requests, + COALESCE(SUM(COALESCE(ul.account_stats_cost, ul.total_cost) * COALESCE(ul.account_rate_multiplier, 1)), 0) as account_cost, + COALESCE(SUM(ul.actual_cost), 0) as user_cost, + MAX(ul.created_at) as last_used_at + FROM usage_logs ul + LEFT JOIN users u ON u.id = ul.user_id + WHERE ul.account_id = $1 AND ul.created_at >= NOW() - make_interval(mins => $2) + GROUP BY ul.user_id, u.email + ORDER BY last_used_at DESC + LIMIT 50 + ` + rows, err := r.db.QueryContext(ctx, query, accountID, minutes) + if err != nil { + return nil, err + } + defer rows.Close() //nolint:errcheck + var result []service.RecentAccountUser + for rows.Next() { + var u service.RecentAccountUser + if err := rows.Scan(&u.UserID, &u.Email, &u.Requests, &u.AccountCost, &u.UserCost, &u.LastUsedAt); err != nil { + return nil, err + } + result = append(result, u) + } + return result, rows.Err() +} + +// GetAccountUsersByTimeRange returns users who used the account within a selected time range. +func (r *usageLogRepository) GetAccountUsersByTimeRange(ctx context.Context, accountID int64, startTime, endTime time.Time) ([]service.RecentAccountUser, error) { + query := ` + SELECT + ul.user_id, + COALESCE(u.email, '') as email, + COUNT(*) as requests, + COALESCE(SUM(COALESCE(ul.account_stats_cost, ul.total_cost) * COALESCE(ul.account_rate_multiplier, 1)), 0) as account_cost, + COALESCE(SUM(ul.actual_cost), 0) as user_cost, + MAX(ul.created_at) as last_used_at + FROM usage_logs ul + LEFT JOIN users u ON u.id = ul.user_id + WHERE ul.account_id = $1 AND ul.created_at >= $2 AND ul.created_at < $3 + GROUP BY ul.user_id, u.email + ORDER BY requests DESC, last_used_at DESC + LIMIT 200 + ` + rows, err := r.db.QueryContext(ctx, query, accountID, startTime, endTime) + if err != nil { + return nil, err + } + defer rows.Close() //nolint:errcheck + var result []service.RecentAccountUser + for rows.Next() { + var u service.RecentAccountUser + if err := rows.Scan(&u.UserID, &u.Email, &u.Requests, &u.AccountCost, &u.UserCost, &u.LastUsedAt); err != nil { + return nil, err + } + result = append(result, u) + } + return result, rows.Err() +} diff --git a/backend/internal/server/api_contract_test.go b/backend/internal/server/api_contract_test.go index 27358865666..1bdf3f909a8 100644 --- a/backend/internal/server/api_contract_test.go +++ b/backend/internal/server/api_contract_test.go @@ -2388,6 +2388,14 @@ func (r *stubUsageLogRepo) GetAllGroupUsageSummary(ctx context.Context, todaySta return nil, errors.New("not implemented") } +func (r *stubUsageLogRepo) GetRecentAccountUsers(ctx context.Context, accountID int64, minutes int) ([]service.RecentAccountUser, error) { + return nil, nil +} + +func (r *stubUsageLogRepo) GetAccountUsersByTimeRange(ctx context.Context, accountID int64, startTime, endTime time.Time) ([]service.RecentAccountUser, error) { + return nil, nil +} + type stubSettingRepo struct { all map[string]string } diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index 5eb0d34b70a..ce8d581c10f 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -296,6 +296,7 @@ func registerAccountRoutes(admin *gin.RouterGroup, h *handler.Handlers) { accounts.GET("/:id/usage", h.Admin.Account.GetUsage) accounts.GET("/:id/today-stats", h.Admin.Account.GetTodayStats) accounts.POST("/today-stats/batch", h.Admin.Account.GetBatchTodayStats) + accounts.GET("/:id/recent-users", h.Admin.Account.GetRecentUsers) accounts.POST("/:id/clear-rate-limit", h.Admin.Account.ClearRateLimit) accounts.POST("/:id/reset-quota", h.Admin.Account.ResetQuota) accounts.GET("/:id/temp-unschedulable", h.Admin.Account.GetTempUnschedulable) diff --git a/backend/internal/service/account_usage_service.go b/backend/internal/service/account_usage_service.go index 68ba8f8ce98..99d9853d536 100644 --- a/backend/internal/service/account_usage_service.go +++ b/backend/internal/service/account_usage_service.go @@ -77,6 +77,21 @@ type UsageLogRepository interface { GetAccountStatsAggregated(ctx context.Context, accountID int64, startTime, endTime time.Time) (*usagestats.UsageStats, error) GetModelStatsAggregated(ctx context.Context, modelName string, startTime, endTime time.Time) (*usagestats.UsageStats, error) GetDailyStatsAggregated(ctx context.Context, userID int64, startTime, endTime time.Time) ([]map[string]any, error) + + // GetRecentAccountUsers returns users who used the account in the last N minutes + GetRecentAccountUsers(ctx context.Context, accountID int64, minutes int) ([]RecentAccountUser, error) + // GetAccountUsersByTimeRange returns users who used the account within the selected time range. + GetAccountUsersByTimeRange(ctx context.Context, accountID int64, startTime, endTime time.Time) ([]RecentAccountUser, error) +} + +// RecentAccountUser represents a user who recently used an account +type RecentAccountUser struct { + UserID int64 `json:"user_id"` + Email string `json:"email"` + Requests int64 `json:"requests"` + AccountCost float64 `json:"account_cost"` + UserCost float64 `json:"user_cost"` + LastUsedAt time.Time `json:"last_used_at"` } type accountWindowStatsBatchReader interface { @@ -1335,3 +1350,13 @@ func buildGeminiUsageProgress(used, limit int64, resetAt time.Time, tokens int64 func (s *AccountUsageService) GetAccountWindowStats(ctx context.Context, accountID int64, startTime time.Time) (*usagestats.AccountStats, error) { return s.usageLogRepo.GetAccountWindowStats(ctx, accountID, startTime) } + +// GetRecentAccountUsers returns users who used the account in the last N minutes +func (s *AccountUsageService) GetRecentAccountUsers(ctx context.Context, accountID int64, minutes int) ([]RecentAccountUser, error) { + return s.usageLogRepo.GetRecentAccountUsers(ctx, accountID, minutes) +} + +// GetAccountUsersByTimeRange returns users who used the account within the selected time range. +func (s *AccountUsageService) GetAccountUsersByTimeRange(ctx context.Context, accountID int64, startTime, endTime time.Time) ([]RecentAccountUser, error) { + return s.usageLogRepo.GetAccountUsersByTimeRange(ctx, accountID, startTime, endTime) +} diff --git a/frontend/src/api/admin/accounts.ts b/frontend/src/api/admin/accounts.ts index 8a1277930c3..3ef53555d04 100644 --- a/frontend/src/api/admin/accounts.ts +++ b/frontend/src/api/admin/accounts.ts @@ -630,6 +630,32 @@ export async function setPrivacy(id: number): Promise { return data } +/** + * Recent account user type + */ +export interface RecentAccountUser { + user_id: number + email: string + requests: number + account_cost: number + user_cost: number + last_used_at: string +} + +/** + * Get recent users of an account (last 5 minutes) + * @param id - Account ID + * @returns List of recent users + */ +export async function getRecentUsers(id: number, params?: { + start_date?: string + end_date?: string + timezone?: string +}): Promise<{ users: RecentAccountUser[] }> { + const { data } = await apiClient.get<{ users: RecentAccountUser[] }>(`/admin/accounts/${id}/recent-users`, { params }) + return data +} + export const accountsAPI = { list, listWithEtag, @@ -666,7 +692,8 @@ export const accountsAPI = { getAntigravityDefaultModelMapping, batchClearError, batchRefresh, - setPrivacy + setPrivacy, + getRecentUsers } export default accountsAPI diff --git a/frontend/src/components/layout/AppSidebar.vue b/frontend/src/components/layout/AppSidebar.vue index 3d7f1604c7e..b0b258165cc 100644 --- a/frontend/src/components/layout/AppSidebar.vue +++ b/frontend/src/components/layout/AppSidebar.vue @@ -733,6 +733,7 @@ const adminNavItems = computed((): NavItem[] => { }, { path: '/admin/subscriptions', label: t('nav.subscriptions'), icon: CreditCardIcon, hideInSimpleMode: true }, { path: '/admin/accounts', label: t('nav.accounts'), icon: GlobeIcon }, + { path: '/admin/account-stats', label: t('nav.accountStats'), icon: ChartIcon }, { path: '/admin/announcements', label: t('nav.announcements'), icon: BellIcon }, { path: '/admin/proxies', label: t('nav.proxies'), icon: ServerIcon }, { path: '/admin/risk-control', label: t('nav.riskControl'), icon: ShieldIcon, hideInSimpleMode: true, featureFlag: flagRiskControl }, diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index 90bf23f730b..fed0d64766c 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -358,6 +358,7 @@ export default { availableChannels: 'Available Channels', subscriptions: 'Subscriptions', accounts: 'Accounts', + accountStats: 'Account Stats', proxies: 'Proxies', redeemCodes: 'Redeem Codes', ops: 'Ops', @@ -3808,6 +3809,51 @@ export default { usageError: 'Fetch Error' }, + // Account Stats + accountStats: { + title: 'Account Statistics', + description: 'View usage statistics by account', + timeRange: 'Time Range', + autoRefresh: 'Auto Refresh', + enableAutoRefresh: 'Enable Auto Refresh', + refreshInterval5s: '5 seconds', + refreshInterval10s: '10 seconds', + refreshInterval15s: '15 seconds', + refreshInterval30s: '30 seconds', + autoRefreshCountdown: 'Auto refresh: {seconds}s', + account: 'Account', + platform: 'Platform', + capacity: 'Concurrency / Capacity', + requests: 'Requests', + tokens: 'Tokens', + cost: 'Cost', + accountBilling: 'Account Billing', + userCharge: 'User Charge', + actions: 'Actions', + viewDetail: 'View Detail', + noAccounts: 'No account data', + rangeUsers: 'Active Users in Current Page Time Range', + noRangeUsers: 'No active users in this time range', + recentUsers: 'Recent Active Users (5 min)', + noRecentUsers: 'No active users', + user: 'User', + email: 'Email', + requestCount: 'Requests', + currentRequests: 'Current Requests', + lastUsedAt: 'Last Used', + activeNow: 'Active', + accountDetail: 'Account Detail', + usageSummary: 'Usage Summary', + totalRequests: 'Total Requests', + totalTokens: 'Total Tokens', + totalCost: 'Total Cost', + inputTokens: 'Input Tokens', + outputTokens: 'Output Tokens', + allPlatforms: 'All Platforms', + allGroups: 'All Groups', + ungroupedGroup: 'Ungrouped', + }, + // Scheduled Tests scheduledTests: { title: 'Scheduled Tests', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 87482f9d329..17d6c0df3c3 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -358,6 +358,7 @@ export default { availableChannels: '可用渠道', subscriptions: '订阅管理', accounts: '账号管理', + accountStats: '账号统计', proxies: 'IP管理', redeemCodes: '兑换码', ops: '运维监控', @@ -3903,6 +3904,51 @@ export default { } }, + // Account Stats + accountStats: { + title: '账号统计', + description: '以账号为维度查看使用统计', + timeRange: '时间范围', + autoRefresh: '自动刷新', + enableAutoRefresh: '启用自动刷新', + refreshInterval5s: '5 秒', + refreshInterval10s: '10 秒', + refreshInterval15s: '15 秒', + refreshInterval30s: '30 秒', + autoRefreshCountdown: '自动刷新:{seconds}s', + account: '账号', + platform: '平台', + capacity: '并发/容量', + requests: '请求数', + tokens: 'Token', + cost: '费用', + accountBilling: '账号计费', + userCharge: '用户扣费', + actions: '操作', + viewDetail: '查看详情', + noAccounts: '暂无账号数据', + rangeUsers: '按照本页时间范围筛选的活跃用户', + noRangeUsers: '本页时间范围内暂无活跃用户', + recentUsers: '最近活跃用户(5分钟内)', + noRecentUsers: '暂无活跃用户', + user: '用户', + email: '邮箱', + requestCount: '请求数', + currentRequests: '当前请求', + lastUsedAt: '最后使用', + activeNow: '活跃中', + accountDetail: '账号详情', + usageSummary: '使用汇总', + totalRequests: '总请求数', + totalTokens: '总 Token', + totalCost: '总费用', + inputTokens: '输入 Token', + outputTokens: '输出 Token', + allPlatforms: '全部平台', + allGroups: '全部分组', + ungroupedGroup: '未分组', + }, + // Scheduled Tests scheduledTests: { title: '定时测试', diff --git a/frontend/src/router/index.ts b/frontend/src/router/index.ts index 9e7111a4bc5..6ba46940c84 100644 --- a/frontend/src/router/index.ts +++ b/frontend/src/router/index.ts @@ -455,6 +455,18 @@ const routes: RouteRecordRaw[] = [ descriptionKey: 'admin.accounts.description' } }, + { + path: '/admin/account-stats', + name: 'AdminAccountStats', + component: () => import('@/views/admin/AccountStatsView.vue'), + meta: { + requiresAuth: true, + requiresAdmin: true, + title: 'Account Statistics', + titleKey: 'admin.accountStats.title', + descriptionKey: 'admin.accountStats.description' + } + }, { path: '/admin/announcements', name: 'AdminAnnouncements', diff --git a/frontend/src/views/admin/AccountStatsView.vue b/frontend/src/views/admin/AccountStatsView.vue new file mode 100644 index 00000000000..4ffc18b12d1 --- /dev/null +++ b/frontend/src/views/admin/AccountStatsView.vue @@ -0,0 +1,607 @@ + + + From 7a9ba78c037b3025e5a6228ed2e868e051fe415a Mon Sep 17 00:00:00 2001 From: Qi HU Date: Wed, 6 May 2026 21:44:12 +0800 Subject: [PATCH 2/2] fix: show realtime active account users --- backend/cmd/server/wire_gen.go | 2 +- backend/internal/handler/gateway_handler.go | 3 +- .../gateway_handler_chat_completions.go | 3 +- .../handler/gateway_handler_responses.go | 3 +- ...eway_handler_warmup_intercept_unit_test.go | 3 + .../handler/gateway_helper_fastpath_test.go | 4 + .../handler/gateway_helper_hotpath_test.go | 4 + .../internal/handler/gemini_v1beta_handler.go | 3 +- .../handler/openai_gateway_handler.go | 3 + backend/internal/pkg/ctxkey/ctxkey.go | 3 + .../internal/repository/concurrency_cache.go | 34 ++++++++ .../internal/service/account_usage_service.go | 87 +++++++++++++++++-- .../internal/service/concurrency_service.go | 82 ++++++++++++++++- .../service/concurrency_service_test.go | 4 + .../service/gateway_multiplatform_test.go | 4 + backend/internal/service/gateway_service.go | 5 ++ .../service/openai_account_scheduler_test.go | 4 + .../service/openai_gateway_service_test.go | 4 + backend/internal/service/wire.go | 20 ++++- backend/internal/testutil/stubs.go | 3 + frontend/src/api/admin/accounts.ts | 1 + frontend/src/views/admin/AccountStatsView.vue | 39 ++++----- 22 files changed, 280 insertions(+), 38 deletions(-) diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index a550118139e..d2ebac19644 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -142,7 +142,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { tlsFingerprintProfileRepository := repository.NewTLSFingerprintProfileRepository(client) tlsFingerprintProfileCache := repository.NewTLSFingerprintProfileCache(redisClient) tlsFingerprintProfileService := service.NewTLSFingerprintProfileService(tlsFingerprintProfileRepository, tlsFingerprintProfileCache) - accountUsageService := service.NewAccountUsageService(accountRepository, usageLogRepository, claudeUsageFetcher, geminiQuotaService, antigravityQuotaFetcher, usageCache, identityCache, tlsFingerprintProfileService) + accountUsageService := service.ProvideAccountUsageService(accountRepository, usageLogRepository, claudeUsageFetcher, geminiQuotaService, antigravityQuotaFetcher, usageCache, identityCache, tlsFingerprintProfileService, concurrencyService, userRepository) oAuthRefreshAPI := service.ProvideOAuthRefreshAPI(accountRepository, geminiTokenCache) geminiTokenProvider := service.ProvideGeminiTokenProvider(accountRepository, geminiTokenCache, geminiOAuthService, oAuthRefreshAPI) claudeTokenProvider := service.ProvideClaudeTokenProvider(accountRepository, geminiTokenCache, oAuthService, oAuthRefreshAPI) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 65836a7e452..1cfaa540bb8 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -125,6 +125,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { h.errorResponse(c, http.StatusInternalServerError, "api_error", "User context not found") return } + c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID)) reqLog := requestLogger( c, "handler.gateway.messages", @@ -322,7 +323,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } for { - selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, fs.FailedAccountIDs, "", int64(0)) // Gemini 不使用会话限制 + selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, fs.FailedAccountIDs, "", subject.UserID) // Gemini 不使用会话限制 if err != nil { if len(fs.FailedAccountIDs) == 0 { reqLog.Warn("gateway.select_account_no_available", diff --git a/backend/internal/handler/gateway_handler_chat_completions.go b/backend/internal/handler/gateway_handler_chat_completions.go index c6b73190367..158a371f379 100644 --- a/backend/internal/handler/gateway_handler_chat_completions.go +++ b/backend/internal/handler/gateway_handler_chat_completions.go @@ -36,6 +36,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) { h.chatCompletionsErrorResponse(c, http.StatusInternalServerError, "api_error", "User context not found") return } + c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID)) reqLog := requestLogger( c, "handler.gateway.chat_completions", @@ -166,7 +167,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) { fs := NewFailoverState(h.maxAccountSwitches, false) for { - selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", int64(0)) + selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", subject.UserID) if err != nil { if len(fs.FailedAccountIDs) == 0 { h.chatCompletionsErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error()) diff --git a/backend/internal/handler/gateway_handler_responses.go b/backend/internal/handler/gateway_handler_responses.go index a97f572d2a8..5c631fd9ce9 100644 --- a/backend/internal/handler/gateway_handler_responses.go +++ b/backend/internal/handler/gateway_handler_responses.go @@ -36,6 +36,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) { h.responsesErrorResponse(c, http.StatusInternalServerError, "api_error", "User context not found") return } + c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID)) reqLog := requestLogger( c, "handler.gateway.responses", @@ -171,7 +172,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) { fs := NewFailoverState(h.maxAccountSwitches, false) for { - selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", int64(0)) + selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", subject.UserID) if err != nil { if len(fs.FailedAccountIDs) == 0 { h.responsesErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error()) diff --git a/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go b/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go index 57554cf976b..1579f7bf4f2 100644 --- a/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go +++ b/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go @@ -138,6 +138,9 @@ func (f *fakeConcurrencyCache) GetAccountConcurrencyBatch(_ context.Context, acc } func (f *fakeConcurrencyCache) CleanupExpiredAccountSlots(context.Context, int64) error { return nil } func (f *fakeConcurrencyCache) CleanupStaleProcessSlots(context.Context, string) error { return nil } +func (f *fakeConcurrencyCache) GetAccountActiveUserConcurrency(context.Context, int64) (map[int64]int, error) { + return nil, nil +} func newTestGatewayHandler(t *testing.T, group *service.Group, accounts []*service.Account) (*GatewayHandler, func()) { t.Helper() diff --git a/backend/internal/handler/gateway_helper_fastpath_test.go b/backend/internal/handler/gateway_helper_fastpath_test.go index c7c0fb6c9ec..1a1287f9200 100644 --- a/backend/internal/handler/gateway_helper_fastpath_test.go +++ b/backend/internal/handler/gateway_helper_fastpath_test.go @@ -93,6 +93,10 @@ func (m *concurrencyCacheMock) CleanupStaleProcessSlots(ctx context.Context, act return nil } +func (m *concurrencyCacheMock) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) { + return nil, nil +} + func TestConcurrencyHelper_TryAcquireUserSlot(t *testing.T) { cache := &concurrencyCacheMock{ acquireUserSlotFn: func(ctx context.Context, userID int64, maxConcurrency int, requestID string) (bool, error) { diff --git a/backend/internal/handler/gateway_helper_hotpath_test.go b/backend/internal/handler/gateway_helper_hotpath_test.go index 4a677199806..15c2f415b8b 100644 --- a/backend/internal/handler/gateway_helper_hotpath_test.go +++ b/backend/internal/handler/gateway_helper_hotpath_test.go @@ -124,6 +124,10 @@ func (s *helperConcurrencyCacheStub) CleanupStaleProcessSlots(ctx context.Contex return nil } +func (s *helperConcurrencyCacheStub) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) { + return nil, nil +} + func newHelperTestContext(method, path string) (*gin.Context, *httptest.ResponseRecorder) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() diff --git a/backend/internal/handler/gemini_v1beta_handler.go b/backend/internal/handler/gemini_v1beta_handler.go index 90ebe9ecc69..9d45e102889 100644 --- a/backend/internal/handler/gemini_v1beta_handler.go +++ b/backend/internal/handler/gemini_v1beta_handler.go @@ -143,6 +143,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { googleError(c, http.StatusInternalServerError, "User context not found") return } + c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), authSubject.UserID)) reqLog := requestLogger( c, "handler.gemini_v1beta.models", @@ -369,7 +370,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { } for { - selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, modelName, fs.FailedAccountIDs, "", int64(0)) // Gemini 不使用会话限制 + selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, modelName, fs.FailedAccountIDs, "", authSubject.UserID) // Gemini 不使用会话限制 if err != nil { if len(fs.FailedAccountIDs) == 0 { googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error()) diff --git a/backend/internal/handler/openai_gateway_handler.go b/backend/internal/handler/openai_gateway_handler.go index 6b07b7ba70b..fd2470f910e 100644 --- a/backend/internal/handler/openai_gateway_handler.go +++ b/backend/internal/handler/openai_gateway_handler.go @@ -113,6 +113,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) { if !h.ensureResponsesDependencies(c, reqLog) { return } + c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID)) // Read request body body, err := pkghttputil.ReadRequestBodyWithPrealloc(c.Request) @@ -572,6 +573,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) { if !h.ensureResponsesDependencies(c, reqLog) { return } + c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID)) body, err := pkghttputil.ReadRequestBodyWithPrealloc(c.Request) if err != nil { @@ -1097,6 +1099,7 @@ func (h *OpenAIGatewayHandler) ResponsesWebSocket(c *gin.Context) { if !h.ensureResponsesDependencies(c, reqLog) { return } + c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID)) reqLog.Info("openai.websocket_ingress_started") clientIP := ip.GetClientIP(c) userAgent := strings.TrimSpace(c.GetHeader("User-Agent")) diff --git a/backend/internal/pkg/ctxkey/ctxkey.go b/backend/internal/pkg/ctxkey/ctxkey.go index 25782c55172..9038bd9601c 100644 --- a/backend/internal/pkg/ctxkey/ctxkey.go +++ b/backend/internal/pkg/ctxkey/ctxkey.go @@ -55,4 +55,7 @@ const ( // ClaudeCodeVersion stores the extracted Claude Code version from User-Agent (e.g. "2.1.22") ClaudeCodeVersion Key = "ctx_claude_code_version" + + // Sub2APIUserID 当前请求的系统用户 ID,用于账号并发槽位中编码用户信息以支持实时活跃用户统计 + Sub2APIUserID Key = "ctx_sub2api_user_id" ) diff --git a/backend/internal/repository/concurrency_cache.go b/backend/internal/repository/concurrency_cache.go index 8732b2cea19..9dceab708e4 100644 --- a/backend/internal/repository/concurrency_cache.go +++ b/backend/internal/repository/concurrency_cache.go @@ -257,6 +257,40 @@ func (c *concurrencyCache) GetAccountConcurrency(ctx context.Context, accountID return result, nil } +func (c *concurrencyCache) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) { + key := accountSlotKey(accountID) + + // Use server time to clean expired slots, then ZRANGE all current members + now, err := c.rdb.Time(ctx).Result() + if err != nil { + return nil, fmt.Errorf("redis TIME: %w", err) + } + cutoffTime := now.Unix() - int64(c.slotTTLSeconds) + + // Pipeline: clean expired + get all members + pipe := c.rdb.Pipeline() + pipe.ZRemRangeByScore(ctx, key, "-inf", strconv.FormatInt(cutoffTime, 10)) + membersCmd := pipe.ZRange(ctx, key, 0, -1) + if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) { + return nil, fmt.Errorf("pipeline exec: %w", err) + } + + members, err := membersCmd.Result() + if err != nil && !errors.Is(err, redis.Nil) { + return nil, fmt.Errorf("zrange: %w", err) + } + + result := make(map[int64]int, len(members)) + for _, member := range members { + userID := service.ParseAccountSlotMemberUserID(member) + if userID <= 0 { + continue + } + result[userID]++ + } + return result, nil +} + func (c *concurrencyCache) GetAccountConcurrencyBatch(ctx context.Context, accountIDs []int64) (map[int64]int, error) { if len(accountIDs) == 0 { return map[int64]int{}, nil diff --git a/backend/internal/service/account_usage_service.go b/backend/internal/service/account_usage_service.go index 99d9853d536..398b6d7f79c 100644 --- a/backend/internal/service/account_usage_service.go +++ b/backend/internal/service/account_usage_service.go @@ -9,6 +9,7 @@ import ( "log/slog" "math/rand/v2" "net/http" + "sort" "strings" "sync" "time" @@ -86,12 +87,13 @@ type UsageLogRepository interface { // RecentAccountUser represents a user who recently used an account type RecentAccountUser struct { - UserID int64 `json:"user_id"` - Email string `json:"email"` - Requests int64 `json:"requests"` - AccountCost float64 `json:"account_cost"` - UserCost float64 `json:"user_cost"` - LastUsedAt time.Time `json:"last_used_at"` + UserID int64 `json:"user_id"` + Email string `json:"email"` + Requests int64 `json:"requests"` + AccountCost float64 `json:"account_cost"` + UserCost float64 `json:"user_cost"` + LastUsedAt time.Time `json:"last_used_at"` + CurrentRequests int64 `json:"current_requests"` } type accountWindowStatsBatchReader interface { @@ -281,6 +283,8 @@ type AccountUsageService struct { cache *UsageCache identityCache IdentityCache tlsFPProfileService *TLSFingerprintProfileService + concurrencyService *ConcurrencyService + userRepo UserRepository } // NewAccountUsageService 创建AccountUsageService实例 @@ -306,6 +310,18 @@ func NewAccountUsageService( } } +// SetConcurrencyService sets the concurrency service for real-time active user tracking. +// Called after construction to avoid circular dependency. +func (s *AccountUsageService) SetConcurrencyService(cs *ConcurrencyService) { + s.concurrencyService = cs +} + +// SetUserRepository sets the user repository for resolving user emails. +// Called after construction to avoid circular dependency. +func (s *AccountUsageService) SetUserRepository(repo UserRepository) { + s.userRepo = repo +} + // GetUsage 获取账号使用量 // OAuth账号: 调用Anthropic API获取真实数据(需要profile scope),API响应缓存10分钟,窗口统计缓存1分钟 // Setup Token账号: 根据session_window推算5h窗口,7d数据不可用(没有profile scope) @@ -1351,9 +1367,64 @@ func (s *AccountUsageService) GetAccountWindowStats(ctx context.Context, account return s.usageLogRepo.GetAccountWindowStats(ctx, accountID, startTime) } -// GetRecentAccountUsers returns users who used the account in the last N minutes +// GetRecentAccountUsers returns users who used the account in the last N minutes, +// merged with real-time in-flight request data from Redis concurrency slots. +// Users with current requests appear even if they have no usage log entry. func (s *AccountUsageService) GetRecentAccountUsers(ctx context.Context, accountID int64, minutes int) ([]RecentAccountUser, error) { - return s.usageLogRepo.GetRecentAccountUsers(ctx, accountID, minutes) + // 1. Get usage log recent users + logUsers, err := s.usageLogRepo.GetRecentAccountUsers(ctx, accountID, minutes) + if err != nil { + return nil, err + } + + // 2. Get real-time active user concurrency from Redis + var activeUsers map[int64]int + if s.concurrencyService != nil { + activeUsers, _ = s.concurrencyService.GetAccountActiveUserConcurrency(ctx, accountID) + } + + if len(activeUsers) == 0 { + return logUsers, nil + } + + // 3. Build map of existing log users by userID + userMap := make(map[int64]*RecentAccountUser, len(logUsers)) + for i := range logUsers { + userMap[logUsers[i].UserID] = &logUsers[i] + } + + // 4. Merge: set CurrentRequests on existing users, add new Redis-only users + for userID, count := range activeUsers { + if existing, ok := userMap[userID]; ok { + existing.CurrentRequests = int64(count) + } else { + // Redis-only user: resolve email from UserRepository + email := "" + if s.userRepo != nil { + if user, userErr := s.userRepo.GetByID(ctx, userID); userErr == nil && user != nil { + email = user.Email + } + } + newUser := RecentAccountUser{ + UserID: userID, + Email: email, + CurrentRequests: int64(count), + LastUsedAt: time.Now(), + } + userMap[userID] = &newUser + logUsers = append(logUsers, newUser) + } + } + + // 5. Sort: current_requests desc first, then last_used_at desc + sort.Slice(logUsers, func(i, j int) bool { + if logUsers[i].CurrentRequests != logUsers[j].CurrentRequests { + return logUsers[i].CurrentRequests > logUsers[j].CurrentRequests + } + return logUsers[i].LastUsedAt.After(logUsers[j].LastUsedAt) + }) + + return logUsers, nil } // GetAccountUsersByTimeRange returns users who used the account within the selected time range. diff --git a/backend/internal/service/concurrency_service.go b/backend/internal/service/concurrency_service.go index 386d5ed05df..4566554d8c3 100644 --- a/backend/internal/service/concurrency_service.go +++ b/backend/internal/service/concurrency_service.go @@ -6,22 +6,79 @@ import ( "encoding/binary" "os" "strconv" + "strings" "sync/atomic" "time" + "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" ) +// EncodeAccountSlotMember encodes a requestID with optional userID metadata. +// Format: requestID|u:{userID} (userID > 0) or plain requestID (userID <= 0). +// The requestID prefix is kept first so startup cleanup by process prefix continues to work. +func EncodeAccountSlotMember(requestID string, userID int64) string { + if userID <= 0 { + return requestID + } + return requestID + "|u:" + strconv.FormatInt(userID, 10) +} + +// ParseAccountSlotMemberUserID extracts the userID from an encoded account slot member. +// Returns 0 if no userID metadata is present or parsing fails. +func ParseAccountSlotMemberUserID(member string) int64 { + idx := strings.Index(member, "|u:") + if idx < 0 { + return 0 + } + uid, err := strconv.ParseInt(member[idx+3:], 10, 64) + if err != nil { + return 0 + } + return uid +} + +// ExtractRequestIDFromMember extracts the requestID portion from an encoded member. +func ExtractRequestIDFromMember(member string) string { + if idx := strings.Index(member, "|u:"); idx >= 0 { + return member[:idx] + } + return member +} + +// WithSub2APIUserID stores the current system user ID in context for account-slot tracking. +func WithSub2APIUserID(ctx context.Context, userID int64) context.Context { + if userID <= 0 { + return ctx + } + return context.WithValue(ctx, ctxkey.Sub2APIUserID, userID) +} + +// Sub2APIUserIDFromContext extracts the current system user ID from context. +func Sub2APIUserIDFromContext(ctx context.Context) int64 { + if ctx == nil { + return 0 + } + if uid, ok := ctx.Value(ctxkey.Sub2APIUserID).(int64); ok { + return uid + } + return 0 +} + // ConcurrencyCache 定义并发控制的缓存接口 // 使用有序集合存储槽位,按时间戳清理过期条目 type ConcurrencyCache interface { // 账号槽位管理 - // 键格式: concurrency:account:{accountID}(有序集合,成员为 requestID) + // 键格式: concurrency:account:{accountID}(有序集合,成员为 requestID 或 requestID|u:{userID}) AcquireAccountSlot(ctx context.Context, accountID int64, maxConcurrency int, requestID string) (bool, error) ReleaseAccountSlot(ctx context.Context, accountID int64, requestID string) error GetAccountConcurrency(ctx context.Context, accountID int64) (int, error) GetAccountConcurrencyBatch(ctx context.Context, accountIDs []int64) (map[int64]int, error) + // GetAccountActiveUserConcurrency 返回账号当前活跃用户的并发数映射。 + // 清理过期槽位后,解析成员中的 userID 元数据并聚合计数,跳过 userID<=0 的成员。 + GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) + // 账号等待队列(账号级) IncrementAccountWaitCount(ctx context.Context, accountID int64, maxWait int) (bool, error) DecrementAccountWaitCount(ctx context.Context, accountID int64) error @@ -127,6 +184,13 @@ type UserLoadInfo struct { // If the account is at max concurrency, it waits until a slot is available or timeout. // Returns a release function that MUST be called when the request completes. func (s *ConcurrencyService) AcquireAccountSlot(ctx context.Context, accountID int64, maxConcurrency int) (*AcquireResult, error) { + return s.AcquireAccountSlotForUser(ctx, accountID, Sub2APIUserIDFromContext(ctx), maxConcurrency) +} + +// AcquireAccountSlotForUser attempts to acquire a concurrency slot for an account, +// encoding the userID into the slot member for real-time active user tracking. +// If userID <= 0, behaves identically to AcquireAccountSlot (no user metadata). +func (s *ConcurrencyService) AcquireAccountSlotForUser(ctx context.Context, accountID int64, userID int64, maxConcurrency int) (*AcquireResult, error) { // If maxConcurrency is 0 or negative, no limit if maxConcurrency <= 0 { return &AcquireResult{ @@ -137,8 +201,10 @@ func (s *ConcurrencyService) AcquireAccountSlot(ctx context.Context, accountID i // Generate unique request ID for this slot requestID := generateRequestID() + // Encode userID into the member for active user tracking + member := EncodeAccountSlotMember(requestID, userID) - acquired, err := s.cache.AcquireAccountSlot(ctx, accountID, maxConcurrency, requestID) + acquired, err := s.cache.AcquireAccountSlot(ctx, accountID, maxConcurrency, member) if err != nil { return nil, err } @@ -149,7 +215,7 @@ func (s *ConcurrencyService) AcquireAccountSlot(ctx context.Context, accountID i ReleaseFunc: func() { bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := s.cache.ReleaseAccountSlot(bgCtx, accountID, requestID); err != nil { + if err := s.cache.ReleaseAccountSlot(bgCtx, accountID, member); err != nil { logger.LegacyPrintf("service.concurrency", "Warning: failed to release account slot for %d (req=%s): %v", accountID, requestID, err) } }, @@ -365,3 +431,13 @@ func (s *ConcurrencyService) GetAccountConcurrencyBatch(ctx context.Context, acc return s.cache.GetAccountConcurrencyBatch(redisCtx, accountIDs) } + +// GetAccountActiveUserConcurrency returns a map of userID -> current in-flight request count +// for the given account. It cleans expired slots, parses userID metadata from members, +// and aggregates counts. Members without userID metadata (userID <= 0) are skipped. +func (s *ConcurrencyService) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) { + if s.cache == nil { + return map[int64]int{}, nil + } + return s.cache.GetAccountActiveUserConcurrency(ctx, accountID) +} diff --git a/backend/internal/service/concurrency_service_test.go b/backend/internal/service/concurrency_service_test.go index 078ba0dc170..f054e1e5909 100644 --- a/backend/internal/service/concurrency_service_test.go +++ b/backend/internal/service/concurrency_service_test.go @@ -95,6 +95,10 @@ func (c *stubConcurrencyCacheForTest) CleanupStaleProcessSlots(_ context.Context return c.cleanupErr } +func (c *stubConcurrencyCacheForTest) GetAccountActiveUserConcurrency(_ context.Context, _ int64) (map[int64]int, error) { + return nil, nil +} + type trackingConcurrencyCache struct { stubConcurrencyCacheForTest cleanupPrefix string diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index 728328373c6..3e32dcac9e5 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -1990,6 +1990,10 @@ func (m *mockConcurrencyCache) CleanupStaleProcessSlots(ctx context.Context, act return nil } +func (m *mockConcurrencyCache) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) { + return nil, nil +} + func (m *mockConcurrencyCache) GetUsersLoadBatch(ctx context.Context, users []UserWithConcurrency) (map[int64]*UserLoadInfo, error) { result := make(map[int64]*UserLoadInfo, len(users)) for _, user := range users { diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 3a003bd23cb..74b1b4fceb8 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -1403,6 +1403,11 @@ func (s *GatewayService) SelectAccountForModelWithExclusions(ctx context.Context // metadataUserID: 用于客户端亲和调度,从中提取客户端 ID // sub2apiUserID: 系统用户 ID,用于二维亲和调度 func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, metadataUserID string, sub2apiUserID int64) (*AccountSelectionResult, error) { + // Store sub2apiUserID in context so tryAcquireAccountSlot can encode it into slot members + if sub2apiUserID > 0 { + ctx = WithSub2APIUserID(ctx, sub2apiUserID) + } + // 调试日志:记录调度入口参数 excludedIDsList := make([]int64, 0, len(excludedIDs)) for id := range excludedIDs { diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index 0950ee54767..bbeb6bbf514 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -79,6 +79,10 @@ func (c schedulerTestConcurrencyCache) ReleaseAccountSlot(ctx context.Context, a return nil } +func (c schedulerTestConcurrencyCache) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) { + return map[int64]int{}, nil +} + func (c schedulerTestConcurrencyCache) GetAccountsLoadBatch(ctx context.Context, accounts []AccountWithConcurrency) (map[int64]*AccountLoadInfo, error) { if c.loadBatchErr != nil { return nil, c.loadBatchErr diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 84a2fe714eb..cb4148734dd 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -127,6 +127,10 @@ func (c stubConcurrencyCache) ReleaseAccountSlot(ctx context.Context, accountID return nil } +func (c stubConcurrencyCache) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) { + return map[int64]int{}, nil +} + func (c stubConcurrencyCache) GetAccountsLoadBatch(ctx context.Context, accounts []AccountWithConcurrency) (map[int64]*AccountLoadInfo, error) { if c.loadBatchErr != nil { return nil, c.loadBatchErr diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index dc96be0c063..5a51a63cdef 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -227,6 +227,24 @@ func ProvideRateLimitService( return svc } +func ProvideAccountUsageService( + accountRepo AccountRepository, + usageLogRepo UsageLogRepository, + claudeFetcher ClaudeUsageFetcher, + geminiQuotaService *GeminiQuotaService, + antigravityFetcher *AntigravityQuotaFetcher, + usageCache *UsageCache, + identityCache IdentityCache, + tlsFPProfileService *TLSFingerprintProfileService, + concurrencyService *ConcurrencyService, + userRepo UserRepository, +) *AccountUsageService { + svc := NewAccountUsageService(accountRepo, usageLogRepo, claudeFetcher, geminiQuotaService, antigravityFetcher, usageCache, identityCache, tlsFPProfileService) + svc.SetConcurrencyService(concurrencyService) + svc.SetUserRepository(userRepo) + return svc +} + // ProvideOpsMetricsCollector creates and starts OpsMetricsCollector. func ProvideOpsMetricsCollector( opsRepo OpsRepository, @@ -463,7 +481,7 @@ var ProviderSet = wire.NewSet( ProvideClaudeTokenProvider, NewAntigravityGatewayService, ProvideRateLimitService, - NewAccountUsageService, + ProvideAccountUsageService, NewAccountTestService, ProvideSettingService, NewDataManagementService, diff --git a/backend/internal/testutil/stubs.go b/backend/internal/testutil/stubs.go index bc572e11372..87aad8f2232 100644 --- a/backend/internal/testutil/stubs.go +++ b/backend/internal/testutil/stubs.go @@ -79,6 +79,9 @@ func (c StubConcurrencyCache) CleanupExpiredAccountSlots(_ context.Context, _ in func (c StubConcurrencyCache) CleanupStaleProcessSlots(_ context.Context, _ string) error { return nil } +func (c StubConcurrencyCache) GetAccountActiveUserConcurrency(_ context.Context, _ int64) (map[int64]int, error) { + return nil, nil +} // ============================================================ // StubGatewayCache — service.GatewayCache 的空实现 diff --git a/frontend/src/api/admin/accounts.ts b/frontend/src/api/admin/accounts.ts index 3ef53555d04..6595454ff03 100644 --- a/frontend/src/api/admin/accounts.ts +++ b/frontend/src/api/admin/accounts.ts @@ -637,6 +637,7 @@ export interface RecentAccountUser { user_id: number email: string requests: number + current_requests: number account_cost: number user_cost: number last_used_at: string diff --git a/frontend/src/views/admin/AccountStatsView.vue b/frontend/src/views/admin/AccountStatsView.vue index 4ffc18b12d1..865563fdaa6 100644 --- a/frontend/src/views/admin/AccountStatsView.vue +++ b/frontend/src/views/admin/AccountStatsView.vue @@ -36,7 +36,7 @@