diff --git a/backend/modules/observability/domain/metric/service/metric.go b/backend/modules/observability/domain/metric/service/metric.go index 1c215f07e..1b854be51 100644 --- a/backend/modules/observability/domain/metric/service/metric.go +++ b/backend/modules/observability/domain/metric/service/metric.go @@ -27,6 +27,7 @@ import ( "github.com/coze-dev/coze-loop/backend/pkg/lang/goroutine" "github.com/coze-dev/coze-loop/backend/pkg/lang/ptr" "github.com/coze-dev/coze-loop/backend/pkg/logs" + timeutil "github.com/coze-dev/coze-loop/backend/pkg/time" "github.com/samber/lo" "golang.org/x/sync/errgroup" ) @@ -43,6 +44,7 @@ type QueryMetricsReq struct { StartTime int64 EndTime int64 GroupBySpaceID bool + Source span_filter.SourceType } type QueryMetricsResp struct { @@ -256,6 +258,11 @@ func (m *MetricsService) QueryMetrics(ctx context.Context, req *QueryMetricsReq) if val != nil && val.DisableQuery { return &QueryMetricsResp{}, nil } + if req.FilterFields != nil { + if err := req.FilterFields.Traverse(processSpecificFilter); err != nil { + return nil, errorx.WrapByCode(err, obErrorx.CommercialCommonInvalidParamCodeCode) + } + } for _, metricName := range req.MetricsNames { mVal, ok := m.metricDefMap[metricName] if !ok { @@ -405,9 +412,12 @@ func (m *MetricsService) buildOnlineMetricQuery(ctx context.Context, req *QueryM EndAt: req.EndTime, } mBuilder := &metricQueryBuilder{ - metricNames: req.MetricsNames, - filter: filter, - spanEnv: &span_filter.SpanEnv{WorkspaceID: req.WorkspaceID}, + metricNames: req.MetricsNames, + filter: filter, + spanEnv: &span_filter.SpanEnv{ + WorkspaceID: req.WorkspaceID, + Source: req.Source, + }, requestFilter: req.FilterFields, granularity: req.Granularity, } @@ -872,3 +882,72 @@ func getDaysBeforeTimeStamp(days int) int64 { daysBefore := time.Date(now.Year(), now.Month(), now.Day()-days, 0, 0, 0, 0, now.Location()) return daysBefore.UnixMilli() } + +// TODO: 合并,有三个地方实现一样... +func processSpecificFilter(f *loop_span.FilterField) error { + switch f.FieldName { + case loop_span.SpanFieldStatus: + if err := processStatusFilter(f); err != nil { + return err + } + case loop_span.SpanFieldDuration, + loop_span.SpanFieldLatencyFirstResp, + loop_span.SpanFieldStartTimeFirstResp, + loop_span.SpanFieldStartTimeFirstTokenResp, + loop_span.SpanFieldLatencyFirstTokenResp, + loop_span.SpanFieldReasoningDuration: + if err := processLatencyFilter(f); err != nil { + return err + } + } + return nil +} + +func processStatusFilter(f *loop_span.FilterField) error { + if f.QueryType == nil || *f.QueryType != loop_span.QueryTypeEnumIn { + return fmt.Errorf("status filter should use in operator") + } + f.FieldName = loop_span.SpanFieldStatusCode + f.FieldType = loop_span.FieldTypeLong + checkSuccess, checkError := false, false + for _, val := range f.Values { + switch val { + case loop_span.SpanStatusSuccess: + checkSuccess = true + case loop_span.SpanStatusError: + checkError = true + default: + return fmt.Errorf("invalid status code field value") + } + } + if checkSuccess && checkError { + f.QueryType = ptr.Of(loop_span.QueryTypeEnumAlwaysTrue) + f.Values = nil + } else if checkSuccess { + f.Values = []string{"0"} + } else if checkError { + f.QueryType = ptr.Of(loop_span.QueryTypeEnumNotIn) + f.Values = []string{"0"} + } else { + return fmt.Errorf("invalid status code query") + } + return nil +} + +// ms -> us +func processLatencyFilter(f *loop_span.FilterField) error { + if f.FieldType != loop_span.FieldTypeLong { + return fmt.Errorf("latency field type should be long ") + } + micros := make([]string, 0) + for _, val := range f.Values { + integer, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return fmt.Errorf("fail to parse long value %s, %v", val, err) + } + integer = timeutil.MillSec2MicroSec(integer) + micros = append(micros, strconv.FormatInt(integer, 10)) + } + f.Values = micros + return nil +} diff --git a/backend/modules/observability/domain/trace/service/trace_service.go b/backend/modules/observability/domain/trace/service/trace_service.go index 860220fa4..6fffbb59b 100644 --- a/backend/modules/observability/domain/trace/service/trace_service.go +++ b/backend/modules/observability/domain/trace/service/trace_service.go @@ -53,6 +53,7 @@ type ListSpansReq struct { PageToken string PlatformType loop_span.PlatformType SpanListType loop_span.SpanListType + Source span_filter.SourceType } type ListSpansResp struct { @@ -1507,6 +1508,7 @@ func (r *TraceServiceImpl) buildBuiltinFilters(ctx context.Context, f span_filte env := &span_filter.SpanEnv{ WorkspaceID: req.WorkspaceID, ThirdPartyWorkspaceID: req.ThirdPartyWorkspaceID, + Source: req.Source, } basicFilter, forceQuery, err := f.BuildBasicSpanFilter(ctx, env) if err != nil {