Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
"github.com/coze-dev/coze-loop/backend/pkg/json"
"github.com/coze-dev/coze-loop/backend/pkg/logs"
"github.com/coze-dev/cozeloop-go/spec/tracespec"
)

func getCategory(taskType task.TaskType) entity.DatasetCategory {
Expand Down Expand Up @@ -194,7 +193,13 @@ func buildItem(ctx context.Context, span *loop_span.Span, fieldMappings []*task_
logs.CtxInfo(ctx, "Evaluator field key is empty, name:%v", fieldSchema.Name)
continue
}
value, err := span.ExtractByJsonpath(ctx, mapping.TraceFieldKey, mapping.TraceFieldJsonpath)
var value string
var err error
if fieldSchema.ContentType == entity.ContentType_MultiPart {
value, err = span.ExtractByJsonpathRaw(ctx, mapping.TraceFieldKey, mapping.TraceFieldJsonpath)
} else {
value, err = span.ExtractByJsonpath(ctx, mapping.TraceFieldKey, mapping.TraceFieldJsonpath)
}
if err != nil {
logs.CtxInfo(ctx, "Extract field failed, err:%v", err)
continue
Expand All @@ -214,53 +219,3 @@ func buildItem(ctx context.Context, span *loop_span.Span, fieldMappings []*task_
}
return fieldDatas
}

// Deprecated: use common function entity.GetContentInfo instead
// GetContentInfo todo:[xun]和手动回流的代码逻辑一样,需要抽取公共代码
func GetContentInfo(ctx context.Context, contentType entity.ContentType, value string) (*common.Content, error) {
var content *common.Content
switch contentType {
case entity.ContentType_MultiPart:
var parts []tracespec.ModelMessagePart
err := json.Unmarshal([]byte(value), &parts)
if err != nil {
logs.CtxInfo(ctx, "Unmarshal multi part failed, err:%v", err)
return nil, err
}
var multiPart []*common.Content
for _, part := range parts {
// 本期仅支持回流图片的多模态数据,非ImageURL信息的,打包放进text
switch part.Type {
case tracespec.ModelMessagePartTypeImage:
if part.ImageURL == nil {
continue
}
multiPart = append(multiPart, &common.Content{
ContentType: gptr.Of(common.ContentTypeImage),
Image: &common.Image{
Name: gptr.Of(part.ImageURL.Name),
URL: gptr.Of(part.ImageURL.URL),
},
})
case tracespec.ModelMessagePartTypeText, tracespec.ModelMessagePartTypeFile:
multiPart = append(multiPart, &common.Content{
ContentType: gptr.Of(common.ContentTypeText),
Text: gptr.Of(part.Text),
})
default:
logs.CtxWarn(ctx, "Unsupported part type: %s", part.Type)
return nil, err
}
}
content = &common.Content{
ContentType: gptr.Of(common.ContentTypeMultiPart),
MultiPart: multiPart,
}
default:
content = &common.Content{
ContentType: gptr.Of(common.ContentTypeText),
Text: gptr.Of(value),
}
}
return content, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,18 @@ func (s *Span) AddAutoEvalAnnotation(taskID, evaluatorRecordID, evaluatorVersion
}

// ExtractByJsonpath 从Span的Input/Output/Tags中提取数据,根据jsonpath返回结果。时间戳按毫秒返回。
// 会递归解析嵌套的 JSON 字符串。
func (s *Span) ExtractByJsonpath(ctx context.Context, key string, jsonpath string) (string, error) {
return s.extractByJsonpath(ctx, key, jsonpath, true)
}

// ExtractByJsonpathRaw 从Span的Input/Output/Tags中提取数据,根据jsonpath返回结果。时间戳按毫秒返回。
// 不会递归解析嵌套的 JSON 字符串,保持原始格式。适用于 MultiPart 类型数据提取。
func (s *Span) ExtractByJsonpathRaw(ctx context.Context, key string, jsonpath string) (string, error) {
return s.extractByJsonpath(ctx, key, jsonpath, false)
}

func (s *Span) extractByJsonpath(ctx context.Context, key string, jsonpath string, recursive bool) (string, error) {
jsonpath = strings.TrimPrefix(jsonpath, key)
jsonpath = strings.TrimPrefix(jsonpath, ".")
data := ""
Expand Down Expand Up @@ -618,7 +629,10 @@ func (s *Span) ExtractByJsonpath(ctx context.Context, key string, jsonpath strin
return data, nil
}

return json.GetStringByJSONPathRecursively(data, jsonpath)
if recursive {
return json.GetStringByJSONPathRecursively(data, jsonpath)
}
return json.GetStringByJSONPath(data, jsonpath)
}

func validField(clipFields *[]string, key, value string) string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/coze-dev/cozeloop-go/spec/tracespec"

"github.com/coze-dev/coze-loop/backend/pkg/json"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -1061,3 +1063,55 @@ func TestSpanList_FilterModelSpans(t *testing.T) {
})
}
}

func TestSpan_ExtractByJsonpathRaw(t *testing.T) {
t.Parallel()
ctx := context.Background()

t.Run("multipart data preserved as original format", func(t *testing.T) {
multipartData := `[{"text":"# Input Data\n<Input_Image>\n","type":"text"},{"text":"[图片-1]\n","type":"text"},{"image_url":{"detail":{"image_resolution":"auto"},"url":"http://example.com/img.jpg"},"type":"image_url"}]`
span := &Span{
Input: `{"content":` + multipartData + `}`,
}

resultRaw, err := span.ExtractByJsonpathRaw(ctx, "Input", "content")
assert.NoError(t, err)
assert.Contains(t, resultRaw, `"detail":{"image_resolution":"auto"}`)
assert.Contains(t, resultRaw, `"type":"text"`)
assert.Contains(t, resultRaw, `"type":"image_url"`)
var parts []tracespec.ModelMessagePart
err = json.Unmarshal([]byte(resultRaw), &parts)
assert.Error(t, err)
})

t.Run("simple json should work same for both methods", func(t *testing.T) {
span := &Span{
Input: `{"name": "test", "value": 123}`,
}

resultRaw, err := span.ExtractByJsonpathRaw(ctx, "Input", "name")
assert.NoError(t, err)
assert.Equal(t, "test", resultRaw)

resultRecursive, err := span.ExtractByJsonpath(ctx, "Input", "name")
assert.NoError(t, err)
assert.Equal(t, "test", resultRecursive)
})

t.Run("real multipart use case from dataset import", func(t *testing.T) {
multipartJSON := `[{"text":"# Input Data\n\u003cInput_Image\u003e\n","type":"text"},{"text":"[图片-1]\n","type":"text"},{"image_url":{"detail":"{\"image_resolution\":\"auto\"}","url":""},"type":"image_url"}]`
span := &Span{
Input: `{"messages":[{"role":"user","content":` + multipartJSON + `}]}`,
}

result, err := span.ExtractByJsonpathRaw(ctx, "Input", "messages[0].content")
assert.NoError(t, err)

assert.Contains(t, result, `"type":"text"`)
assert.Contains(t, result, `"type":"image_url"`)
assert.Contains(t, result, `\u003cInput_Image\u003e`)
var parts []tracespec.ModelMessagePart
err = json.Unmarshal([]byte(result), &parts)
assert.NoError(t, err)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,12 @@ func (r *TraceExportServiceImpl) buildItem(ctx context.Context, span *loop_span.
}
}
} else {
value, err = span.ExtractByJsonpath(ctx, mapping.TraceFieldKey, mapping.TraceFieldJsonpath)
if mapping.FieldSchema.ContentType == entity.ContentType_MultiPart {
value, err = span.ExtractByJsonpathRaw(ctx, mapping.TraceFieldKey, mapping.TraceFieldJsonpath)
} else {
value, err = span.ExtractByJsonpath(ctx, mapping.TraceFieldKey, mapping.TraceFieldJsonpath)
}
if err != nil {
// 非json但使用了jsonpath,也不报错,置空
logs.CtxInfo(ctx, "Extract field failed, err:%v", err)
}
}
Expand Down
Loading