Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 82 additions & 3 deletions backend/modules/observability/domain/metric/service/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/coze-dev/coze-loop/backend/pkg/lang/goroutine"
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
"github.com/coze-dev/coze-loop/backend/pkg/logs"
timeutil "github.com/coze-dev/coze-loop/backend/pkg/time"
"github.com/samber/lo"
"golang.org/x/sync/errgroup"
)
Expand All @@ -43,6 +44,7 @@ type QueryMetricsReq struct {
StartTime int64
EndTime int64
GroupBySpaceID bool
Source span_filter.SourceType
}

type QueryMetricsResp struct {
Expand Down Expand Up @@ -256,6 +258,11 @@ func (m *MetricsService) QueryMetrics(ctx context.Context, req *QueryMetricsReq)
if val != nil && val.DisableQuery {
return &QueryMetricsResp{}, nil
}
if req.FilterFields != nil {
if err := req.FilterFields.Traverse(processSpecificFilter); err != nil {
return nil, errorx.WrapByCode(err, obErrorx.CommercialCommonInvalidParamCodeCode)
}
}
for _, metricName := range req.MetricsNames {
mVal, ok := m.metricDefMap[metricName]
if !ok {
Expand Down Expand Up @@ -405,9 +412,12 @@ func (m *MetricsService) buildOnlineMetricQuery(ctx context.Context, req *QueryM
EndAt: req.EndTime,
}
mBuilder := &metricQueryBuilder{
metricNames: req.MetricsNames,
filter: filter,
spanEnv: &span_filter.SpanEnv{WorkspaceID: req.WorkspaceID},
metricNames: req.MetricsNames,
filter: filter,
spanEnv: &span_filter.SpanEnv{
WorkspaceID: req.WorkspaceID,
Source: req.Source,
},
requestFilter: req.FilterFields,
granularity: req.Granularity,
}
Expand Down Expand Up @@ -872,3 +882,72 @@ func getDaysBeforeTimeStamp(days int) int64 {
daysBefore := time.Date(now.Year(), now.Month(), now.Day()-days, 0, 0, 0, 0, now.Location())
return daysBefore.UnixMilli()
}

// TODO: 合并,有三个地方实现一样...
func processSpecificFilter(f *loop_span.FilterField) error {
switch f.FieldName {
case loop_span.SpanFieldStatus:
if err := processStatusFilter(f); err != nil {
return err
}
case loop_span.SpanFieldDuration,
loop_span.SpanFieldLatencyFirstResp,
loop_span.SpanFieldStartTimeFirstResp,
loop_span.SpanFieldStartTimeFirstTokenResp,
loop_span.SpanFieldLatencyFirstTokenResp,
loop_span.SpanFieldReasoningDuration:
if err := processLatencyFilter(f); err != nil {
return err
}
}
return nil
}

func processStatusFilter(f *loop_span.FilterField) error {
if f.QueryType == nil || *f.QueryType != loop_span.QueryTypeEnumIn {
return fmt.Errorf("status filter should use in operator")
}
f.FieldName = loop_span.SpanFieldStatusCode
f.FieldType = loop_span.FieldTypeLong
checkSuccess, checkError := false, false
for _, val := range f.Values {
switch val {
case loop_span.SpanStatusSuccess:
checkSuccess = true
case loop_span.SpanStatusError:
checkError = true
default:
return fmt.Errorf("invalid status code field value")
}
}
if checkSuccess && checkError {
f.QueryType = ptr.Of(loop_span.QueryTypeEnumAlwaysTrue)
f.Values = nil
} else if checkSuccess {
f.Values = []string{"0"}
} else if checkError {
f.QueryType = ptr.Of(loop_span.QueryTypeEnumNotIn)
f.Values = []string{"0"}
} else {
return fmt.Errorf("invalid status code query")
}
return nil
}

// ms -> us
func processLatencyFilter(f *loop_span.FilterField) error {
if f.FieldType != loop_span.FieldTypeLong {
return fmt.Errorf("latency field type should be long ")
}
micros := make([]string, 0)
for _, val := range f.Values {
integer, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return fmt.Errorf("fail to parse long value %s, %v", val, err)
}
integer = timeutil.MillSec2MicroSec(integer)
micros = append(micros, strconv.FormatInt(integer, 10))
}
f.Values = micros
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type ListSpansReq struct {
PageToken string
PlatformType loop_span.PlatformType
SpanListType loop_span.SpanListType
Source span_filter.SourceType
}

type ListSpansResp struct {
Expand Down Expand Up @@ -1507,6 +1508,7 @@ func (r *TraceServiceImpl) buildBuiltinFilters(ctx context.Context, f span_filte
env := &span_filter.SpanEnv{
WorkspaceID: req.WorkspaceID,
ThirdPartyWorkspaceID: req.ThirdPartyWorkspaceID,
Source: req.Source,
}
basicFilter, forceQuery, err := f.BuildBasicSpanFilter(ctx, env)
if err != nil {
Expand Down
Loading