Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions backend/internal/handler/admin/account_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,49 @@ func (h *AccountHandler) GetUsage(c *gin.Context) {
response.Success(c, usage)
}

// GetRecentUsers handles getting recent users of an account
// GET /api/v1/admin/accounts/:id/recent-users
func (h *AccountHandler) GetRecentUsers(c *gin.Context) {
accountID, err := strconv.ParseInt(c.Param("id"), 10, 64)
if err != nil {
response.BadRequest(c, "Invalid account ID")
return
}

var users []service.RecentAccountUser
startDateStr := strings.TrimSpace(c.Query("start_date"))
endDateStr := strings.TrimSpace(c.Query("end_date"))
if startDateStr != "" || endDateStr != "" {
if startDateStr == "" || endDateStr == "" {
response.BadRequest(c, "start_date and end_date are required together")
return
}
userTZ := c.Query("timezone")
startTime, parseErr := timezone.ParseInUserLocation("2006-01-02", startDateStr, userTZ)
if parseErr != nil {
response.BadRequest(c, "Invalid start_date format, use YYYY-MM-DD")
return
}
endTime, parseErr := timezone.ParseInUserLocation("2006-01-02", endDateStr, userTZ)
if parseErr != nil {
response.BadRequest(c, "Invalid end_date format, use YYYY-MM-DD")
return
}
users, err = h.accountUsageService.GetAccountUsersByTimeRange(c.Request.Context(), accountID, startTime, endTime.AddDate(0, 0, 1))
} else {
users, err = h.accountUsageService.GetRecentAccountUsers(c.Request.Context(), accountID, 5)
}
if err != nil {
response.ErrorFrom(c, err)
return
}
if users == nil {
users = []service.RecentAccountUser{}
}

response.Success(c, gin.H{"users": users})
}

// ClearRateLimit handles clearing account rate limit status
// POST /api/v1/admin/accounts/:id/clear-rate-limit
func (h *AccountHandler) ClearRateLimit(c *gin.Context) {
Expand Down
3 changes: 2 additions & 1 deletion backend/internal/handler/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
h.errorResponse(c, http.StatusInternalServerError, "api_error", "User context not found")
return
}
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
reqLog := requestLogger(
c,
"handler.gateway.messages",
Expand Down Expand Up @@ -322,7 +323,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
}

for {
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, fs.FailedAccountIDs, "", int64(0)) // Gemini 不使用会话限制
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, fs.FailedAccountIDs, "", subject.UserID) // Gemini 不使用会话限制
if err != nil {
if len(fs.FailedAccountIDs) == 0 {
reqLog.Warn("gateway.select_account_no_available",
Expand Down
3 changes: 2 additions & 1 deletion backend/internal/handler/gateway_handler_chat_completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
h.chatCompletionsErrorResponse(c, http.StatusInternalServerError, "api_error", "User context not found")
return
}
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
reqLog := requestLogger(
c,
"handler.gateway.chat_completions",
Expand Down Expand Up @@ -166,7 +167,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
fs := NewFailoverState(h.maxAccountSwitches, false)

for {
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", int64(0))
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", subject.UserID)
if err != nil {
if len(fs.FailedAccountIDs) == 0 {
h.chatCompletionsErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error())
Expand Down
3 changes: 2 additions & 1 deletion backend/internal/handler/gateway_handler_responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
h.responsesErrorResponse(c, http.StatusInternalServerError, "api_error", "User context not found")
return
}
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
reqLog := requestLogger(
c,
"handler.gateway.responses",
Expand Down Expand Up @@ -171,7 +172,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
fs := NewFailoverState(h.maxAccountSwitches, false)

for {
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", int64(0))
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionHash, reqModel, fs.FailedAccountIDs, "", subject.UserID)
if err != nil {
if len(fs.FailedAccountIDs) == 0 {
h.responsesErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func (f *fakeConcurrencyCache) GetAccountConcurrencyBatch(_ context.Context, acc
}
func (f *fakeConcurrencyCache) CleanupExpiredAccountSlots(context.Context, int64) error { return nil }
func (f *fakeConcurrencyCache) CleanupStaleProcessSlots(context.Context, string) error { return nil }
func (f *fakeConcurrencyCache) GetAccountActiveUserConcurrency(context.Context, int64) (map[int64]int, error) {
return nil, nil
}

func newTestGatewayHandler(t *testing.T, group *service.Group, accounts []*service.Account) (*GatewayHandler, func()) {
t.Helper()
Expand Down
4 changes: 4 additions & 0 deletions backend/internal/handler/gateway_helper_fastpath_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (m *concurrencyCacheMock) CleanupStaleProcessSlots(ctx context.Context, act
return nil
}

func (m *concurrencyCacheMock) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) {
return nil, nil
}

func TestConcurrencyHelper_TryAcquireUserSlot(t *testing.T) {
cache := &concurrencyCacheMock{
acquireUserSlotFn: func(ctx context.Context, userID int64, maxConcurrency int, requestID string) (bool, error) {
Expand Down
4 changes: 4 additions & 0 deletions backend/internal/handler/gateway_helper_hotpath_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (s *helperConcurrencyCacheStub) CleanupStaleProcessSlots(ctx context.Contex
return nil
}

func (s *helperConcurrencyCacheStub) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) {
return nil, nil
}

func newHelperTestContext(method, path string) (*gin.Context, *httptest.ResponseRecorder) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
Expand Down
3 changes: 2 additions & 1 deletion backend/internal/handler/gemini_v1beta_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
googleError(c, http.StatusInternalServerError, "User context not found")
return
}
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), authSubject.UserID))
reqLog := requestLogger(
c,
"handler.gemini_v1beta.models",
Expand Down Expand Up @@ -369,7 +370,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
}

for {
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, modelName, fs.FailedAccountIDs, "", int64(0)) // Gemini 不使用会话限制
selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, modelName, fs.FailedAccountIDs, "", authSubject.UserID) // Gemini 不使用会话限制
if err != nil {
if len(fs.FailedAccountIDs) == 0 {
googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error())
Expand Down
3 changes: 3 additions & 0 deletions backend/internal/handler/openai_gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
if !h.ensureResponsesDependencies(c, reqLog) {
return
}
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))

// Read request body
body, err := pkghttputil.ReadRequestBodyWithPrealloc(c.Request)
Expand Down Expand Up @@ -572,6 +573,7 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
if !h.ensureResponsesDependencies(c, reqLog) {
return
}
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))

body, err := pkghttputil.ReadRequestBodyWithPrealloc(c.Request)
if err != nil {
Expand Down Expand Up @@ -1097,6 +1099,7 @@ func (h *OpenAIGatewayHandler) ResponsesWebSocket(c *gin.Context) {
if !h.ensureResponsesDependencies(c, reqLog) {
return
}
c.Request = c.Request.WithContext(service.WithSub2APIUserID(c.Request.Context(), subject.UserID))
reqLog.Info("openai.websocket_ingress_started")
clientIP := ip.GetClientIP(c)
userAgent := strings.TrimSpace(c.GetHeader("User-Agent"))
Expand Down
3 changes: 3 additions & 0 deletions backend/internal/pkg/ctxkey/ctxkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@ const (

// ClaudeCodeVersion stores the extracted Claude Code version from User-Agent (e.g. "2.1.22")
ClaudeCodeVersion Key = "ctx_claude_code_version"

// Sub2APIUserID 当前请求的系统用户 ID,用于账号并发槽位中编码用户信息以支持实时活跃用户统计
Sub2APIUserID Key = "ctx_sub2api_user_id"
)
34 changes: 34 additions & 0 deletions backend/internal/repository/concurrency_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,40 @@ func (c *concurrencyCache) GetAccountConcurrency(ctx context.Context, accountID
return result, nil
}

func (c *concurrencyCache) GetAccountActiveUserConcurrency(ctx context.Context, accountID int64) (map[int64]int, error) {
key := accountSlotKey(accountID)

// Use server time to clean expired slots, then ZRANGE all current members
now, err := c.rdb.Time(ctx).Result()
if err != nil {
return nil, fmt.Errorf("redis TIME: %w", err)
}
cutoffTime := now.Unix() - int64(c.slotTTLSeconds)

// Pipeline: clean expired + get all members
pipe := c.rdb.Pipeline()
pipe.ZRemRangeByScore(ctx, key, "-inf", strconv.FormatInt(cutoffTime, 10))
membersCmd := pipe.ZRange(ctx, key, 0, -1)
if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) {
return nil, fmt.Errorf("pipeline exec: %w", err)
}

members, err := membersCmd.Result()
if err != nil && !errors.Is(err, redis.Nil) {
return nil, fmt.Errorf("zrange: %w", err)
}

result := make(map[int64]int, len(members))
for _, member := range members {
userID := service.ParseAccountSlotMemberUserID(member)
if userID <= 0 {
continue
}
result[userID]++
}
return result, nil
}

func (c *concurrencyCache) GetAccountConcurrencyBatch(ctx context.Context, accountIDs []int64) (map[int64]int, error) {
if len(accountIDs) == 0 {
return map[int64]int{}, nil
Expand Down
66 changes: 66 additions & 0 deletions backend/internal/repository/usage_log_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4392,3 +4392,69 @@ func setToSlice(set map[int64]struct{}) []int64 {
}
return out
}

// GetRecentAccountUsers returns users who used the account in the last N minutes
func (r *usageLogRepository) GetRecentAccountUsers(ctx context.Context, accountID int64, minutes int) ([]service.RecentAccountUser, error) {
query := `
SELECT
ul.user_id,
COALESCE(u.email, '') as email,
COUNT(*) as requests,
COALESCE(SUM(COALESCE(ul.account_stats_cost, ul.total_cost) * COALESCE(ul.account_rate_multiplier, 1)), 0) as account_cost,
COALESCE(SUM(ul.actual_cost), 0) as user_cost,
MAX(ul.created_at) as last_used_at
FROM usage_logs ul
LEFT JOIN users u ON u.id = ul.user_id
WHERE ul.account_id = $1 AND ul.created_at >= NOW() - make_interval(mins => $2)
GROUP BY ul.user_id, u.email
ORDER BY last_used_at DESC
LIMIT 50
`
rows, err := r.db.QueryContext(ctx, query, accountID, minutes)
if err != nil {
return nil, err
}
defer rows.Close() //nolint:errcheck
var result []service.RecentAccountUser
for rows.Next() {
var u service.RecentAccountUser
if err := rows.Scan(&u.UserID, &u.Email, &u.Requests, &u.AccountCost, &u.UserCost, &u.LastUsedAt); err != nil {
return nil, err
}
result = append(result, u)
}
return result, rows.Err()
}

// GetAccountUsersByTimeRange returns users who used the account within a selected time range.
func (r *usageLogRepository) GetAccountUsersByTimeRange(ctx context.Context, accountID int64, startTime, endTime time.Time) ([]service.RecentAccountUser, error) {
query := `
SELECT
ul.user_id,
COALESCE(u.email, '') as email,
COUNT(*) as requests,
COALESCE(SUM(COALESCE(ul.account_stats_cost, ul.total_cost) * COALESCE(ul.account_rate_multiplier, 1)), 0) as account_cost,
COALESCE(SUM(ul.actual_cost), 0) as user_cost,
MAX(ul.created_at) as last_used_at
FROM usage_logs ul
LEFT JOIN users u ON u.id = ul.user_id
WHERE ul.account_id = $1 AND ul.created_at >= $2 AND ul.created_at < $3
GROUP BY ul.user_id, u.email
ORDER BY requests DESC, last_used_at DESC
LIMIT 200
`
rows, err := r.db.QueryContext(ctx, query, accountID, startTime, endTime)
if err != nil {
return nil, err
}
defer rows.Close() //nolint:errcheck
var result []service.RecentAccountUser
for rows.Next() {
var u service.RecentAccountUser
if err := rows.Scan(&u.UserID, &u.Email, &u.Requests, &u.AccountCost, &u.UserCost, &u.LastUsedAt); err != nil {
return nil, err
}
result = append(result, u)
}
return result, rows.Err()
}
8 changes: 8 additions & 0 deletions backend/internal/server/api_contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2388,6 +2388,14 @@ func (r *stubUsageLogRepo) GetAllGroupUsageSummary(ctx context.Context, todaySta
return nil, errors.New("not implemented")
}

func (r *stubUsageLogRepo) GetRecentAccountUsers(ctx context.Context, accountID int64, minutes int) ([]service.RecentAccountUser, error) {
return nil, nil
}

func (r *stubUsageLogRepo) GetAccountUsersByTimeRange(ctx context.Context, accountID int64, startTime, endTime time.Time) ([]service.RecentAccountUser, error) {
return nil, nil
}

type stubSettingRepo struct {
all map[string]string
}
Expand Down
1 change: 1 addition & 0 deletions backend/internal/server/routes/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func registerAccountRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
accounts.GET("/:id/usage", h.Admin.Account.GetUsage)
accounts.GET("/:id/today-stats", h.Admin.Account.GetTodayStats)
accounts.POST("/today-stats/batch", h.Admin.Account.GetBatchTodayStats)
accounts.GET("/:id/recent-users", h.Admin.Account.GetRecentUsers)
accounts.POST("/:id/clear-rate-limit", h.Admin.Account.ClearRateLimit)
accounts.POST("/:id/reset-quota", h.Admin.Account.ResetQuota)
accounts.GET("/:id/temp-unschedulable", h.Admin.Account.GetTempUnschedulable)
Expand Down
Loading
Loading