Skip to content

Commit 614eeaa

Browse files
kidkidkidHearyShen
authored andcommitted
[feat][backend]: expand metric query (#414)
* feat(backend): add metric source * feat(backend): fix filter
1 parent 521e64f commit 614eeaa

2 files changed

Lines changed: 84 additions & 3 deletions

File tree

backend/modules/observability/domain/metric/service/metric.go

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/coze-dev/coze-loop/backend/pkg/lang/goroutine"
2828
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
2929
"github.com/coze-dev/coze-loop/backend/pkg/logs"
30+
timeutil "github.com/coze-dev/coze-loop/backend/pkg/time"
3031
"github.com/samber/lo"
3132
"golang.org/x/sync/errgroup"
3233
)
@@ -43,6 +44,7 @@ type QueryMetricsReq struct {
4344
StartTime int64
4445
EndTime int64
4546
GroupBySpaceID bool
47+
Source span_filter.SourceType
4648
}
4749

4850
type QueryMetricsResp struct {
@@ -256,6 +258,11 @@ func (m *MetricsService) QueryMetrics(ctx context.Context, req *QueryMetricsReq)
256258
if val != nil && val.DisableQuery {
257259
return &QueryMetricsResp{}, nil
258260
}
261+
if req.FilterFields != nil {
262+
if err := req.FilterFields.Traverse(processSpecificFilter); err != nil {
263+
return nil, errorx.WrapByCode(err, obErrorx.CommercialCommonInvalidParamCodeCode)
264+
}
265+
}
259266
for _, metricName := range req.MetricsNames {
260267
mVal, ok := m.metricDefMap[metricName]
261268
if !ok {
@@ -405,9 +412,12 @@ func (m *MetricsService) buildOnlineMetricQuery(ctx context.Context, req *QueryM
405412
EndAt: req.EndTime,
406413
}
407414
mBuilder := &metricQueryBuilder{
408-
metricNames: req.MetricsNames,
409-
filter: filter,
410-
spanEnv: &span_filter.SpanEnv{WorkspaceID: req.WorkspaceID},
415+
metricNames: req.MetricsNames,
416+
filter: filter,
417+
spanEnv: &span_filter.SpanEnv{
418+
WorkspaceID: req.WorkspaceID,
419+
Source: req.Source,
420+
},
411421
requestFilter: req.FilterFields,
412422
granularity: req.Granularity,
413423
}
@@ -872,3 +882,72 @@ func getDaysBeforeTimeStamp(days int) int64 {
872882
daysBefore := time.Date(now.Year(), now.Month(), now.Day()-days, 0, 0, 0, 0, now.Location())
873883
return daysBefore.UnixMilli()
874884
}
885+
886+
// TODO: 合并,有三个地方实现一样...
887+
func processSpecificFilter(f *loop_span.FilterField) error {
888+
switch f.FieldName {
889+
case loop_span.SpanFieldStatus:
890+
if err := processStatusFilter(f); err != nil {
891+
return err
892+
}
893+
case loop_span.SpanFieldDuration,
894+
loop_span.SpanFieldLatencyFirstResp,
895+
loop_span.SpanFieldStartTimeFirstResp,
896+
loop_span.SpanFieldStartTimeFirstTokenResp,
897+
loop_span.SpanFieldLatencyFirstTokenResp,
898+
loop_span.SpanFieldReasoningDuration:
899+
if err := processLatencyFilter(f); err != nil {
900+
return err
901+
}
902+
}
903+
return nil
904+
}
905+
906+
func processStatusFilter(f *loop_span.FilterField) error {
907+
if f.QueryType == nil || *f.QueryType != loop_span.QueryTypeEnumIn {
908+
return fmt.Errorf("status filter should use in operator")
909+
}
910+
f.FieldName = loop_span.SpanFieldStatusCode
911+
f.FieldType = loop_span.FieldTypeLong
912+
checkSuccess, checkError := false, false
913+
for _, val := range f.Values {
914+
switch val {
915+
case loop_span.SpanStatusSuccess:
916+
checkSuccess = true
917+
case loop_span.SpanStatusError:
918+
checkError = true
919+
default:
920+
return fmt.Errorf("invalid status code field value")
921+
}
922+
}
923+
if checkSuccess && checkError {
924+
f.QueryType = ptr.Of(loop_span.QueryTypeEnumAlwaysTrue)
925+
f.Values = nil
926+
} else if checkSuccess {
927+
f.Values = []string{"0"}
928+
} else if checkError {
929+
f.QueryType = ptr.Of(loop_span.QueryTypeEnumNotIn)
930+
f.Values = []string{"0"}
931+
} else {
932+
return fmt.Errorf("invalid status code query")
933+
}
934+
return nil
935+
}
936+
937+
// ms -> us
938+
func processLatencyFilter(f *loop_span.FilterField) error {
939+
if f.FieldType != loop_span.FieldTypeLong {
940+
return fmt.Errorf("latency field type should be long ")
941+
}
942+
micros := make([]string, 0)
943+
for _, val := range f.Values {
944+
integer, err := strconv.ParseInt(val, 10, 64)
945+
if err != nil {
946+
return fmt.Errorf("fail to parse long value %s, %v", val, err)
947+
}
948+
integer = timeutil.MillSec2MicroSec(integer)
949+
micros = append(micros, strconv.FormatInt(integer, 10))
950+
}
951+
f.Values = micros
952+
return nil
953+
}

backend/modules/observability/domain/trace/service/trace_service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type ListSpansReq struct {
5353
PageToken string
5454
PlatformType loop_span.PlatformType
5555
SpanListType loop_span.SpanListType
56+
Source span_filter.SourceType
5657
}
5758

5859
type ListSpansResp struct {
@@ -1507,6 +1508,7 @@ func (r *TraceServiceImpl) buildBuiltinFilters(ctx context.Context, f span_filte
15071508
env := &span_filter.SpanEnv{
15081509
WorkspaceID: req.WorkspaceID,
15091510
ThirdPartyWorkspaceID: req.ThirdPartyWorkspaceID,
1511+
Source: req.Source,
15101512
}
15111513
basicFilter, forceQuery, err := f.BuildBasicSpanFilter(ctx, env)
15121514
if err != nil {

0 commit comments

Comments
 (0)