Skip to content

Commit 8c1902e

Browse files
csg-pr-botDev Agent
authored andcommitted
aigateway: structured metering resource IDs and RecordUsage cleanup (#979)
Co-authored-by: Dev Agent <dev-agent@example.com>
1 parent 27c4982 commit 8c1902e

File tree

7 files changed

+318
-299
lines changed

7 files changed

+318
-299
lines changed

_mocks/opencsg.com/csghub-server/aigateway/component/mock_OpenAIComponent.go

Lines changed: 10 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aigateway/component/openai.go

Lines changed: 118 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type OpenAIComponent interface {
3030
GetAvailableModels(c context.Context, user string) ([]types.Model, error)
3131
ListModels(c context.Context, user string, req types.ListModelsReq) (types.ModelList, error)
3232
GetModelByID(c context.Context, username, modelID string) (*types.Model, error)
33-
RecordUsage(c context.Context, userUUID string, model *types.Model, tokenCounter token.Counter, sceneValue string) error
33+
RecordUsage(c context.Context, userUUID string, model *types.Model, tokenCounter token.Counter) error
3434
CheckBalance(ctx context.Context, username, userUUID string) error
3535
}
3636

@@ -442,88 +442,157 @@ func getSceneFromSvcType(svcType int) int {
442442
}
443443
}
444444

445-
func (m *openaiComponentImpl) RecordUsage(c context.Context, userUUID string, model *types.Model, counter token.Counter, sceneValue string) error {
446-
usage, err := counter.Usage(c)
447-
if err != nil {
448-
return fmt.Errorf("failed to get token usage from counter,error:%w", err)
445+
// csghubMeteringLLMTypeFromModel returns metadata llm_type (e.g. serverless, inference) used as the path component in csghub://… metering URIs.
446+
func csghubMeteringLLMTypeFromModel(m *types.Model) (string, error) {
447+
if m == nil {
448+
return "", fmt.Errorf("model is nil")
449+
}
450+
if m.Metadata == nil {
451+
return "", fmt.Errorf("model metadata is nil: cannot resolve %s for resource path", types.MetaKeyLLMType)
449452
}
453+
llmType, ok := m.Metadata[types.MetaKeyLLMType].(string)
454+
if !ok {
455+
return "", fmt.Errorf("model metadata %s missing or not a string", types.MetaKeyLLMType)
456+
}
457+
return llmType, nil
458+
}
450459

451-
scene := parseScene(sceneValue)
452-
slog.DebugContext(c, "token usage", slog.Any("usage", usage), slog.Any("scene", scene))
453-
var tokenUsageExtra = struct {
454-
PromptTokenNum string `json:"prompt_token_num"`
455-
CompletionTokenNum string `json:"completion_token_num"`
456-
// 0: external, 1: owner is user, 2: other user is inference, 3: serverless
457-
OwnerType commontypes.TokenUsageType `json:"owner_type"`
458-
}{
459-
PromptTokenNum: fmt.Sprintf("%d", usage.PromptTokens),
460-
CompletionTokenNum: fmt.Sprintf("%d", usage.CompletionTokens),
460+
// meteringResourceFromModel builds a MeteringResource from an OpenAI gateway model (see types.MeteringResource).
461+
func meteringResourceFromModel(model *types.Model) (types.MeteringResource, error) {
462+
if model == nil {
463+
return types.MeteringResource{}, fmt.Errorf("model is nil")
464+
}
465+
if model.CSGHubModelID != "" {
466+
llmType, err := csghubMeteringLLMTypeFromModel(model)
467+
if err != nil {
468+
return types.MeteringResource{}, err
469+
}
470+
id := fmt.Sprintf(types.CSGHubResourceFmt, llmType, model.CSGHubModelID)
471+
return types.MeteringResource{
472+
ResourceID: id,
473+
ResourceName: id,
474+
CustomerID: model.SvcName,
475+
}, nil
476+
}
477+
if model.Provider != "" {
478+
id := fmt.Sprintf(types.ExternalLLMResourceFmt, model.Provider, model.ID)
479+
return types.MeteringResource{
480+
ResourceID: id,
481+
ResourceName: id,
482+
CustomerID: id,
483+
}, nil
484+
}
485+
return types.MeteringResource{}, nil
486+
}
487+
488+
// tokenUsageMeteringExtra is serialized into MeteringEvent.Extra for token billing breakdown.
489+
type tokenUsageMeteringExtra struct {
490+
PromptTokenNum string `json:"prompt_token_num"`
491+
CompletionTokenNum string `json:"completion_token_num"`
492+
OwnerType commontypes.TokenUsageType `json:"owner_type"`
493+
}
494+
495+
func validateModelForUsageRecord(c context.Context, model *types.Model) error {
496+
if model == nil {
497+
return fmt.Errorf("record usage: model is nil")
461498
}
462499
if model.CSGHubModelID != "" && model.Provider != "" {
463500
slog.WarnContext(c, "bad model info, both csghub model id and external model provider is set",
464-
slog.Any("model info", model))
501+
slog.Any("model", model))
502+
return fmt.Errorf("record usage: conflicting csghub model id and external provider")
465503
}
466504
if model.CSGHubModelID == "" && model.Provider == "" {
467505
slog.WarnContext(c, "bad model info, both csghub model id and external model provider is not set",
468-
slog.Any("model info", model))
506+
slog.Any("model", model))
507+
return fmt.Errorf("record usage: model missing resource identifiers")
508+
}
509+
return nil
510+
}
511+
512+
func (m *openaiComponentImpl) tokenUsageMeteringExtraAndScene(c context.Context, userUUID string, model *types.Model, usage *token.Usage) (tokenUsageMeteringExtra, commontypes.SceneType, error) {
513+
scene := commontypes.SceneModelServerless
514+
extra := tokenUsageMeteringExtra{
515+
PromptTokenNum: fmt.Sprintf("%d", usage.PromptTokens),
516+
CompletionTokenNum: fmt.Sprintf("%d", usage.CompletionTokens),
469517
}
470518
if model.CSGHubModelID != "" {
471519
switch model.SvcType {
472520
case commontypes.ServerlessType:
473-
tokenUsageExtra.OwnerType = commontypes.CSGHubServerlessInference
521+
extra.OwnerType = commontypes.CSGHubServerlessInference
474522
case commontypes.InferenceType:
475523
if model.OwnerUUID == userUUID {
476-
tokenUsageExtra.OwnerType = commontypes.CSGHubUserDeployedInference
524+
extra.OwnerType = commontypes.CSGHubUserDeployedInference
477525
} else {
478526
belong, err := m.checkOrganization(c, userUUID, model.OwnerUUID)
479527
if err != nil {
480-
return fmt.Errorf("failed to check organization,error:%w", err)
528+
return tokenUsageMeteringExtra{}, 0, fmt.Errorf("failed to check organization: %w", err)
481529
}
482530
if belong {
483-
tokenUsageExtra.OwnerType = commontypes.CSGHubOrganFellowDeployedInference
531+
extra.OwnerType = commontypes.CSGHubOrganFellowDeployedInference
484532
} else {
485-
tokenUsageExtra.OwnerType = commontypes.CSGHubOtherDeployedInference
533+
extra.OwnerType = commontypes.CSGHubOtherDeployedInference
486534
}
487535
}
536+
scene = commontypes.SceneModelInference
488537
default:
489-
slog.WarnContext(c, "bad model info, csghub model missing service type",
490-
slog.Any("model info", model))
538+
slog.ErrorContext(c, "bad model info, csghub model missing service type", slog.Any("model", model))
539+
return tokenUsageMeteringExtra{}, 0, fmt.Errorf("record usage: csghub model has invalid or missing service type")
491540
}
541+
} else if model.Provider != "" {
542+
extra.OwnerType = commontypes.ExternalInference
492543
}
493-
if model.Provider != "" {
494-
tokenUsageExtra.OwnerType = commontypes.ExternalInference
495-
}
544+
return extra, scene, nil
545+
}
496546

497-
extraData, _ := json.Marshal(tokenUsageExtra)
498-
event := commontypes.MeteringEvent{
499-
Uuid: uuid.New(),
500-
UserUUID: userUUID,
501-
Value: usage.TotalTokens,
502-
ValueType: commontypes.TokenNumberType, // count by token
503-
Scene: int(scene),
504-
OpUID: "aigateway",
505-
CreatedAt: time.Now(),
506-
Extra: string(extraData),
547+
func (m *openaiComponentImpl) RecordUsage(c context.Context, userUUID string, model *types.Model, counter token.Counter) error {
548+
usage, err := counter.Usage(c)
549+
if err != nil {
550+
return fmt.Errorf("failed to get token usage from counter: %w", err)
507551
}
508-
if model.CSGHubModelID != "" {
509-
event.ResourceID = model.CSGHubModelID
510-
event.ResourceName = model.CSGHubModelID
511-
event.CustomerID = model.SvcName
552+
if err := validateModelForUsageRecord(c, model); err != nil {
553+
return err
512554
}
513-
if model.Provider != "" {
514-
extendModelKey := fmt.Sprintf("%s:%s", model.Provider, model.ID)
515-
event.ResourceID = extendModelKey
516-
event.ResourceName = extendModelKey
517-
event.CustomerID = extendModelKey
555+
res, ridErr := meteringResourceFromModel(model)
556+
if ridErr != nil {
557+
slog.ErrorContext(c, "cannot record usage: invalid model for resource id", slog.Any("error", ridErr), slog.Any("model", model))
558+
return fmt.Errorf("cannot record usage: %w", ridErr)
559+
}
560+
if res.ResourceID == "" {
561+
slog.ErrorContext(c, "cannot record usage: empty resource id for model", slog.Any("model", model))
562+
return fmt.Errorf("cannot record usage: empty resource id")
563+
}
564+
extra, scene, err := m.tokenUsageMeteringExtraAndScene(c, userUUID, model, usage)
565+
if err != nil {
566+
return err
567+
}
568+
extraData, err := json.Marshal(extra)
569+
if err != nil {
570+
return fmt.Errorf("failed to marshal token usage extra: %w", err)
571+
}
572+
event := commontypes.MeteringEvent{
573+
Uuid: uuid.New(),
574+
UserUUID: userUUID,
575+
Value: usage.TotalTokens,
576+
ValueType: commontypes.TokenNumberType,
577+
Scene: int(scene),
578+
OpUID: "aigateway",
579+
CreatedAt: time.Now(),
580+
Extra: string(extraData),
581+
ResourceID: res.ResourceID,
582+
ResourceName: res.ResourceName,
583+
CustomerID: res.CustomerID,
584+
}
585+
eventData, err := json.Marshal(event)
586+
if err != nil {
587+
return fmt.Errorf("failed to marshal metering event: %w", err)
518588
}
519-
eventData, _ := json.Marshal(event)
520589
err = m.eventPub.PublishMeteringEvent(eventData)
521590
if err != nil {
522591
slog.ErrorContext(c, "failed to publish token usage event", slog.Any("event", event), slog.Any("error", err))
523-
return fmt.Errorf("failed to publish token usage event,error:%w", err)
592+
return fmt.Errorf("failed to publish token usage event: %w", err)
524593
}
525594

526-
slog.InfoContext(c, "public token usage event success", slog.Any("event", event))
595+
slog.InfoContext(c, "published token usage event success", slog.Any("event", event))
527596
return nil
528597
}
529598

aigateway/component/openai_ce.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"opencsg.com/csghub-server/builder/store/cache"
1111
"opencsg.com/csghub-server/builder/store/database"
1212
"opencsg.com/csghub-server/common/config"
13-
common_types "opencsg.com/csghub-server/common/types"
1413
)
1514

1615
type extendOpenai struct{}
@@ -39,12 +38,6 @@ func (e *openaiComponentImpl) userPreference(ctx context.Context, req *types.Use
3938
return req.Models, nil
4039
}
4140

42-
// parseScene parses the scene value from the HTTP header
43-
// return SceneModelServerless
44-
func parseScene(sceneValue string) common_types.SceneType {
45-
return common_types.SceneModelServerless
46-
}
47-
4841
func (e *extendOpenai) CheckBalance(ctx context.Context, username, userUUID string) error {
4942
return nil
5043
}

0 commit comments

Comments
 (0)