Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions backend/modules/observability/application/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (o *OpenAPIApplication) OtelIngestTraces(ctx context.Context, req *openapi.
if err != nil {
return nil, err
}
spansMap := unpackSpace(req.WorkspaceID, reqSpanProto)
spansMap := o.unpackOtelSpace(ctx, req.WorkspaceID, reqSpanProto)
partialFailSpanNumber := 0
partialErrMessage := ""
for workspaceId, otelSpans := range spansMap {
Expand Down Expand Up @@ -338,16 +338,16 @@ func ungzip(contentEncoding string, data []byte) ([]byte, error) {
return uncompressedData.Bytes(), nil
}

func unpackSpace(outerSpaceID string, reqSpanProto *otel.ExportTraceServiceRequest) map[string][]*otel.ResourceScopeSpan {
func (o *OpenAPIApplication) unpackOtelSpace(ctx context.Context, outerSpaceID string, reqSpanProto *otel.ExportTraceServiceRequest) map[string][]*otel.ResourceScopeSpan {
if reqSpanProto == nil {
return nil
}
spansMap := make(map[string][]*otel.ResourceScopeSpan)
for _, resourceSpans := range reqSpanProto.ResourceSpans {
for _, scopeSpans := range resourceSpans.ScopeSpans {
for _, span := range scopeSpans.Spans {
for _, scopeSpan := range scopeSpans.Spans {
spaceID := ""
for _, attribute := range span.Attributes {
for _, attribute := range scopeSpan.Attributes {
if attribute.Key == otel.OtelAttributeWorkSpaceID {
spaceID = attribute.Value.GetStringValue()
break
Expand All @@ -356,13 +356,17 @@ func unpackSpace(outerSpaceID string, reqSpanProto *otel.ExportTraceServiceReque
if spaceID == "" {
spaceID = outerSpaceID
}
if spaceID == "" {
claim := o.auth.GetClaim(ctx)
spaceID = o.workspace.GetIngestWorkSpaceID(ctx, []*span.InputSpan{o.convertOtelTag2InputSpan(scopeSpan)}, claim)
}
if spansMap[spaceID] == nil {
spansMap[spaceID] = make([]*otel.ResourceScopeSpan, 0)
}
spansMap[spaceID] = append(spansMap[spaceID], &otel.ResourceScopeSpan{
Resource: resourceSpans.Resource,
Scope: scopeSpans.Scope,
Span: span,
Span: scopeSpan,
})

}
Expand All @@ -372,6 +376,22 @@ func unpackSpace(outerSpaceID string, reqSpanProto *otel.ExportTraceServiceReque
return spansMap
}

func (o *OpenAPIApplication) convertOtelTag2InputSpan(scopeSpan *otel.Span) *span.InputSpan {
if scopeSpan == nil {
return nil
}
tags := make(map[string]string, 0)
for _, attribute := range scopeSpan.Attributes {
if attribute.Value.IsStringValue() {
tags[attribute.Key] = attribute.Value.GetStringValue()
}
}

return &span.InputSpan{
TagsString: tags,
}
}

func unmarshalOtelSpan(spanSrc []byte, contentType string) (*otel.ExportTraceServiceRequest, error) {
finalResult := &otel.ExportTraceServiceRequest{}
if strings.Contains(contentType, otel.ContentTypeProtoBuf) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) 2025 coze-dev Authors
// SPDX-License-Identifier: Apache-2.0

package application

import (
"context"
"testing"

rpcmocks "github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/rpc/mocks"
workspacemocks "github.com/coze-dev/coze-loop/backend/modules/observability/domain/component/workspace/mocks"
"github.com/coze-dev/coze-loop/backend/modules/observability/lib/otel"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
)

func TestOpenAPIApplication_unpackOtelSpace(t *testing.T) {
type fields struct {
auth func(ctrl *gomock.Controller) *rpcmocks.MockIAuthProvider
workspace func(ctrl *gomock.Controller) *workspacemocks.MockIWorkSpaceProvider
}
type args struct {
ctx context.Context
outerSpaceID string
reqSpanProto *otel.ExportTraceServiceRequest
}
tests := []struct {
name string
fields fields
args args
want map[string]int // spaceID -> number of spans
}{
{
name: "reqSpanProto is nil",
fields: fields{},
args: args{
reqSpanProto: nil,
},
want: nil,
},
{
name: "spaceID in span attributes",
fields: fields{
auth: func(ctrl *gomock.Controller) *rpcmocks.MockIAuthProvider {
return rpcmocks.NewMockIAuthProvider(ctrl)
},
workspace: func(ctrl *gomock.Controller) *workspacemocks.MockIWorkSpaceProvider {
return workspacemocks.NewMockIWorkSpaceProvider(ctrl)
},
},
args: args{
ctx: context.Background(),
outerSpaceID: "outer",
reqSpanProto: &otel.ExportTraceServiceRequest{
ResourceSpans: []*otel.ResourceSpans{
{
ScopeSpans: []*otel.ScopeSpans{
{
Spans: []*otel.Span{
{
Attributes: []*otel.KeyValue{
{
Key: otel.OtelAttributeWorkSpaceID,
Value: &otel.AnyValue{
Value: &otel.AnyValue_StringValue{
StringValue: "inner",
},
},
},
},
},
},
},
},
},
},
},
},
want: map[string]int{"inner": 1},
},
{
name: "spaceID not in attributes, use outerSpaceID",
fields: fields{
auth: func(ctrl *gomock.Controller) *rpcmocks.MockIAuthProvider {
return rpcmocks.NewMockIAuthProvider(ctrl)
},
workspace: func(ctrl *gomock.Controller) *workspacemocks.MockIWorkSpaceProvider {
return workspacemocks.NewMockIWorkSpaceProvider(ctrl)
},
},
args: args{
ctx: context.Background(),
outerSpaceID: "outer",
reqSpanProto: &otel.ExportTraceServiceRequest{
ResourceSpans: []*otel.ResourceSpans{
{
ScopeSpans: []*otel.ScopeSpans{
{
Spans: []*otel.Span{
{
Attributes: []*otel.KeyValue{},
},
},
},
},
},
},
},
},
want: map[string]int{"outer": 1},
},
{
name: "both empty, call GetIngestWorkSpaceID",
fields: fields{
auth: func(ctrl *gomock.Controller) *rpcmocks.MockIAuthProvider {
m := rpcmocks.NewMockIAuthProvider(ctrl)
m.EXPECT().GetClaim(gomock.Any()).Return(nil)
return m
},
workspace: func(ctrl *gomock.Controller) *workspacemocks.MockIWorkSpaceProvider {
m := workspacemocks.NewMockIWorkSpaceProvider(ctrl)
m.EXPECT().GetIngestWorkSpaceID(gomock.Any(), gomock.Any(), gomock.Any()).Return("mocked")
return m
},
},
args: args{
ctx: context.Background(),
outerSpaceID: "",
reqSpanProto: &otel.ExportTraceServiceRequest{
ResourceSpans: []*otel.ResourceSpans{
{
ScopeSpans: []*otel.ScopeSpans{
{
Spans: []*otel.Span{
{
Attributes: []*otel.KeyValue{},
},
},
},
},
},
},
},
},
want: map[string]int{"mocked": 1},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

var authProvider *rpcmocks.MockIAuthProvider
if tt.fields.auth != nil {
authProvider = tt.fields.auth(ctrl)
}
var workspaceProvider *workspacemocks.MockIWorkSpaceProvider
if tt.fields.workspace != nil {
workspaceProvider = tt.fields.workspace(ctrl)
}

o := &OpenAPIApplication{
auth: authProvider,
workspace: workspaceProvider,
}
got := o.unpackOtelSpace(tt.args.ctx, tt.args.outerSpaceID, tt.args.reqSpanProto)
if tt.want == nil {
assert.Nil(t, got)
} else {
assert.Equal(t, len(tt.want), len(got))
for k, v := range tt.want {
assert.Equal(t, v, len(got[k]))
}
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
type Literal string

const (
TextLiteral Literal = "text"
ImageLiteral Literal = "image"
TextLiteral Literal = "text"
ImageLiteral Literal = "image"
ImageUrlLiteral Literal = "image_url"
)

type ModelMessagePartType string
Expand Down Expand Up @@ -89,7 +90,7 @@ func convertModelMsg(msg map[string]interface{}) map[string]interface{} {
case string(TextLiteral):
part["type"] = string(ModelMessagePartTypeText)
part["text"] = text
case string(ImageLiteral):
case string(ImageLiteral), string(ImageUrlLiteral):
part["type"] = string(ModelMessagePartTypeImage)
imageMap, ok := image.(map[string]interface{})
if ok {
Expand Down
Loading