Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion backend/modules/observability/lib/otel/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
otelAttributeInput = "cozeloop.input"
otelAttributeOutput = "cozeloop.output"
otelAttributeLogID = "cozeloop.logid"
otelAttributeStatusCode = "cozeloop.status_code"

apmInput = "gen_ai.input"
apmOutput = "gen_ai.output"
Expand All @@ -34,6 +35,9 @@ const (
otelAttributePromptKey = "cozeloop.prompt_key"
otelAttributePromptVersion = "cozeloop.prompt_version"
otelAttributePromptProvider = "cozeloop.prompt_provider"

// system
otelAttributeSystemRuntime = "cozeloop.system_tag_runtime"
)

// openinference attribute key
Expand Down Expand Up @@ -96,7 +100,10 @@ const (

// otel attribute key
const (
otelAttributeModelInputTools = "gen_ai.tool.definitions"
otelAttributeModelInputTools = "gen_ai.tool.definitions" // model tools

otelAttributeToolInput = "gen_ai.tool.call.arguments" // tool input
otelAttributeToolOutput = "gen_ai.tool.call.result"
)

var otelMessageEventNameMap = []string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ func convertModelMsg(msg map[string]interface{}) map[string]interface{} {
modelMsg["reasoning_content"] = c
}

// contents
// contents or parts
var contents []interface{}
if c, ok := msg["contents"].([]interface{}); ok && len(c) > 0 {
contents = c
} else if c, ok := msg["content"].([]interface{}); ok && len(c) > 0 {
contents = c
partsKey := []string{"contents", "content", "parts"}
for _, key := range partsKey {
if parts, ok := msg[key].([]interface{}); ok && len(parts) > 0 {
contents = parts
break
}
}
if len(contents) > 0 {
parts := make([]interface{}, 0, len(contents))
Expand All @@ -84,6 +86,9 @@ func convertModelMsg(msg map[string]interface{}) map[string]interface{} {
}
typ, _ := mcContent["type"]
text, _ := mcContent["text"]
if text == nil {
text, _ = mcContent["content"]
}
image, _ := mcContent["image_url"]
part := map[string]interface{}{}
switch typ {
Expand Down
39 changes: 31 additions & 8 deletions backend/modules/observability/lib/otel/otel_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

// FieldConfMap Field configuration, supports configuring data sources and export methods for fields, currently supports attribute, event, is_tag, data_type
// Among them, attributes and events support configuring multiple, while tags and datatypes only support configuring one.
// For AttributeKey and AttributeKeyPrefix, the field at the head has higher priority, and result will be returned once a match is found.
// For Events, the fields have same priority, and result will be fixed by all fields.
// Other types of configurations need to be manually processed in the code.
var (
FieldConfMap = map[string]FieldConf{
Expand Down Expand Up @@ -68,6 +70,11 @@ var (
IsTag: true,
DataType: dataTypeString,
},
"status_code": {
AttributeKey: []string{otelAttributeStatusCode},
IsTag: false,
DataType: dataTypeInt64,
},
"psm": {
AttributeKey: []string{"service.name"},
IsTag: false,
Expand All @@ -86,11 +93,13 @@ var (
springAIAttributeToolInput,
otelAttributeInput,
apmInput,
otelAttributeToolInput,
},
AttributeKeyPrefix: []string{
openInferenceAttributeModelInputMessages,
openInferenceAttributeToolInput,
string(semconv1_27_0.GenAIPromptKey),
string(semconv.GenAIInputMessagesKey),
},
EventName: []string{otelEventModelSystemMessage, otelEventModelUserMessage, otelEventModelToolMessage, otelEventModelAssistantMessage, otelSpringAIEventModelPrompt},
DataType: dataTypeString,
Expand All @@ -113,10 +122,12 @@ var (
springAIAttributeToolOutput,
otelAttributeOutput,
apmOutput,
otelAttributeToolOutput,
},
AttributeKeyPrefix: []string{
openInferenceAttributeModelOutputMessages,
string(semconv1_27_0.GenAICompletionKey),
string(semconv.GenAIOutputMessagesKey),
},
EventName: []string{otelEventModelChoice, otelSpringAIEventModelCompletion},
DataType: dataTypeString,
Expand Down Expand Up @@ -343,6 +354,11 @@ func OtelSpanConvertToSendSpan(ctx context.Context, spaceID string, resourceScop
}
if conf.IsTag {
tagsLong[fieldKey] = value
} else {
switch fieldKey {
case "status_code":
statusCode = int32(value)
Comment thread Dismissed
}
}
case dataTypeBool:
value, ok := srcValue.(bool)
Expand Down Expand Up @@ -384,7 +400,7 @@ func OtelSpanConvertToSendSpan(ctx context.Context, spaceID string, resourceScop
// set attributes
calOtherAttribute(ctx, span, tagsString, tagsLong, tagsDouble, tagsBool)
// set runtime
calRuntime(systemTagsString, resourceScopeSpan)
calRuntime(systemTagsString, tagsString, resourceScopeSpan)

result := &LoopSpan{
StartTime: startTimeUnixNanoInt64 / 1000,
Expand Down Expand Up @@ -530,12 +546,19 @@ func calOtherAttribute(ctx context.Context, span *Span, tagsString map[string]st
}
}

func calRuntime(systemTagsString map[string]string, resourceScopeSpan *ResourceScopeSpan) {
systemTagsString[tracespec.Runtime_] = getRuntime(resourceScopeSpan)
func calRuntime(systemTagsString map[string]string, tagsString map[string]string, resourceScopeSpan *ResourceScopeSpan) {
systemTagsString[tracespec.Runtime_] = getRuntime(tagsString, resourceScopeSpan)
}

func getRuntime(resourceScopeSpan *ResourceScopeSpan) string {
runtime := processRuntime(resourceScopeSpan)
func getRuntime(tagsString map[string]string, resourceScopeSpan *ResourceScopeSpan) string {
if len(tagsString) > 0 {
if runtime, ok := tagsString[otelAttributeSystemRuntime]; ok && len(runtime) > 0 {
delete(tagsString, otelAttributeSystemRuntime)
return runtime
}
}

runtime := processRuntimeByScope(resourceScopeSpan)
marshalString, err := sonic.MarshalString(runtime)
if err != nil {
return "" // unexpected
Expand All @@ -544,7 +567,7 @@ func getRuntime(resourceScopeSpan *ResourceScopeSpan) string {
return marshalString
}

func processRuntime(resourceScopeSpan *ResourceScopeSpan) *tracespec.Runtime {
func processRuntimeByScope(resourceScopeSpan *ResourceScopeSpan) *tracespec.Runtime {
res := &tracespec.Runtime{
Library: tracespec.VLibOpentelemetry,
Scene: "",
Expand Down Expand Up @@ -667,7 +690,7 @@ func processAttributePrefix(ctx context.Context, fieldKey string, conf FieldConf
}
}
}
case openInferenceAttributeModelInputMessages: // openInference(or litellm) input message
case openInferenceAttributeModelInputMessages, string(semconv.GenAIInputMessagesKey): // openInference(or litellm) or openTelemetry input message
srcInput, err := open_inference.ConvertToModelInput(srcAttrAggrRes)
if err != nil {
continue
Expand All @@ -686,7 +709,7 @@ func processAttributePrefix(ctx context.Context, fieldKey string, conf FieldConf
continue
}
}
case openInferenceAttributeModelOutputMessages: // openInference output message
case openInferenceAttributeModelOutputMessages, string(semconv.GenAIOutputMessagesKey): // openInference(or litellm) or openTelemetry output message
resObject, err := open_inference.ConvertToModelOutput(srcAttrAggrRes)
if err == nil {
toBeMarshalObject = resObject
Expand Down
6 changes: 3 additions & 3 deletions backend/modules/observability/lib/otel/otel_convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ func TestProcessRuntime(t *testing.T) {
panic(r) // Re-panic if unexpected
}
}()
result := processRuntime(tt.resourceScopeSpan)
result := processRuntimeByScope(tt.resourceScopeSpan)
tt.validate(t, result)
})
}
Expand Down Expand Up @@ -1093,7 +1093,7 @@ func TestGetRuntime(t *testing.T) {
panic(r) // Re-panic if unexpected
}
}()
result := getRuntime(tt.resourceScopeSpan)
result := getRuntime(nil, tt.resourceScopeSpan)
tt.validate(t, result)
})
}
Expand Down Expand Up @@ -1149,7 +1149,7 @@ func TestCalRuntime(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
calRuntime(tt.systemTagsString, tt.resourceScopeSpan)
calRuntime(tt.systemTagsString, nil, tt.resourceScopeSpan)
tt.validate(t, tt.systemTagsString)
})
}
Expand Down
Loading