Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion backend/modules/observability/lib/otel/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
otelAttributeInput = "cozeloop.input"
otelAttributeOutput = "cozeloop.output"
otelAttributeLogID = "cozeloop.logid"
otelAttributeStatusCode = "cozeloop.status_code"

apmInput = "gen_ai.input"
apmOutput = "gen_ai.output"
Expand All @@ -34,6 +35,9 @@ const (
otelAttributePromptKey = "cozeloop.prompt_key"
otelAttributePromptVersion = "cozeloop.prompt_version"
otelAttributePromptProvider = "cozeloop.prompt_provider"

// system
otelAttributeSystemRuntime = "cozeloop.system_tag_runtime"
)

// openinference attribute key
Expand Down Expand Up @@ -96,7 +100,10 @@ const (

// otel attribute key
const (
otelAttributeModelInputTools = "gen_ai.tool.definitions"
otelAttributeModelInputTools = "gen_ai.tool.definitions" // model tools

otelAttributeToolInput = "gen_ai.tool.call.arguments" // tool input
otelAttributeToolOutput = "gen_ai.tool.call.result"
)

var otelMessageEventNameMap = []string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ func convertModelMsg(msg map[string]interface{}) map[string]interface{} {
modelMsg["reasoning_content"] = c
}

// contents
// contents or parts
var contents []interface{}
if c, ok := msg["contents"].([]interface{}); ok && len(c) > 0 {
contents = c
} else if c, ok := msg["content"].([]interface{}); ok && len(c) > 0 {
contents = c
partsKey := []string{"contents", "content", "parts"}
for _, key := range partsKey {
if parts, ok := msg[key].([]interface{}); ok && len(parts) > 0 {
contents = parts
break
}
}
if len(contents) > 0 {
parts := make([]interface{}, 0, len(contents))
Expand All @@ -84,6 +86,9 @@ func convertModelMsg(msg map[string]interface{}) map[string]interface{} {
}
typ, _ := mcContent["type"]
text, _ := mcContent["text"]
if text == nil {
text, _ = mcContent["content"]
}
image, _ := mcContent["image_url"]
part := map[string]interface{}{}
switch typ {
Expand Down
39 changes: 31 additions & 8 deletions backend/modules/observability/lib/otel/otel_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

// FieldConfMap Field configuration, supports configuring data sources and export methods for fields, currently supports attribute, event, is_tag, data_type
// Among them, attributes and events support configuring multiple, while tags and datatypes only support configuring one.
// For AttributeKey and AttributeKeyPrefix, the field at the head has higher priority, and result will be returned once a match is found.
// For Events, the fields have same priority, and result will be fixed by all fields.
// Other types of configurations need to be manually processed in the code.
var (
FieldConfMap = map[string]FieldConf{
Expand Down Expand Up @@ -68,6 +70,11 @@
IsTag: true,
DataType: dataTypeString,
},
"status_code": {
AttributeKey: []string{otelAttributeStatusCode},
IsTag: false,
DataType: dataTypeInt64,
},
"psm": {
AttributeKey: []string{"service.name"},
IsTag: false,
Expand All @@ -86,11 +93,13 @@
springAIAttributeToolInput,
otelAttributeInput,
apmInput,
otelAttributeToolInput,
},
AttributeKeyPrefix: []string{
openInferenceAttributeModelInputMessages,
openInferenceAttributeToolInput,
string(semconv1_27_0.GenAIPromptKey),
string(semconv.GenAIInputMessagesKey),
},
EventName: []string{otelEventModelSystemMessage, otelEventModelUserMessage, otelEventModelToolMessage, otelEventModelAssistantMessage, otelSpringAIEventModelPrompt},
DataType: dataTypeString,
Expand All @@ -113,10 +122,12 @@
springAIAttributeToolOutput,
otelAttributeOutput,
apmOutput,
otelAttributeToolOutput,
},
AttributeKeyPrefix: []string{
openInferenceAttributeModelOutputMessages,
string(semconv1_27_0.GenAICompletionKey),
string(semconv.GenAIOutputMessagesKey),
},
EventName: []string{otelEventModelChoice, otelSpringAIEventModelCompletion},
DataType: dataTypeString,
Expand Down Expand Up @@ -343,6 +354,11 @@
}
if conf.IsTag {
tagsLong[fieldKey] = value
} else {
switch fieldKey {
case "status_code":
statusCode = int32(value)
Comment thread Dismissed
}
}
case dataTypeBool:
value, ok := srcValue.(bool)
Expand Down Expand Up @@ -384,7 +400,7 @@
// set attributes
calOtherAttribute(ctx, span, tagsString, tagsLong, tagsDouble, tagsBool)
// set runtime
calRuntime(systemTagsString, resourceScopeSpan)
calRuntime(systemTagsString, tagsString, resourceScopeSpan)

result := &LoopSpan{
StartTime: startTimeUnixNanoInt64 / 1000,
Expand Down Expand Up @@ -530,12 +546,19 @@
}
}

func calRuntime(systemTagsString map[string]string, resourceScopeSpan *ResourceScopeSpan) {
systemTagsString[tracespec.Runtime_] = getRuntime(resourceScopeSpan)
func calRuntime(systemTagsString map[string]string, tagsString map[string]string, resourceScopeSpan *ResourceScopeSpan) {
systemTagsString[tracespec.Runtime_] = getRuntime(tagsString, resourceScopeSpan)
}

func getRuntime(resourceScopeSpan *ResourceScopeSpan) string {
runtime := processRuntime(resourceScopeSpan)
func getRuntime(tagsString map[string]string, resourceScopeSpan *ResourceScopeSpan) string {
if len(tagsString) > 0 {
if runtime, ok := tagsString[otelAttributeSystemRuntime]; ok && len(runtime) > 0 {
delete(tagsString, otelAttributeSystemRuntime)
return runtime
}
}

runtime := processRuntimeByScope(resourceScopeSpan)
marshalString, err := sonic.MarshalString(runtime)
if err != nil {
return "" // unexpected
Expand All @@ -544,7 +567,7 @@
return marshalString
}

func processRuntime(resourceScopeSpan *ResourceScopeSpan) *tracespec.Runtime {
func processRuntimeByScope(resourceScopeSpan *ResourceScopeSpan) *tracespec.Runtime {
res := &tracespec.Runtime{
Library: tracespec.VLibOpentelemetry,
Scene: "",
Expand Down Expand Up @@ -667,7 +690,7 @@
}
}
}
case openInferenceAttributeModelInputMessages: // openInference(or litellm) input message
case openInferenceAttributeModelInputMessages, string(semconv.GenAIInputMessagesKey): // openInference(or litellm) or openTelemetry input message
srcInput, err := open_inference.ConvertToModelInput(srcAttrAggrRes)
if err != nil {
continue
Expand All @@ -686,7 +709,7 @@
continue
}
}
case openInferenceAttributeModelOutputMessages: // openInference output message
case openInferenceAttributeModelOutputMessages, string(semconv.GenAIOutputMessagesKey): // openInference(or litellm) or openTelemetry output message
resObject, err := open_inference.ConvertToModelOutput(srcAttrAggrRes)
if err == nil {
toBeMarshalObject = resObject
Expand Down
6 changes: 3 additions & 3 deletions backend/modules/observability/lib/otel/otel_convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ func TestProcessRuntime(t *testing.T) {
panic(r) // Re-panic if unexpected
}
}()
result := processRuntime(tt.resourceScopeSpan)
result := processRuntimeByScope(tt.resourceScopeSpan)
tt.validate(t, result)
})
}
Expand Down Expand Up @@ -1093,7 +1093,7 @@ func TestGetRuntime(t *testing.T) {
panic(r) // Re-panic if unexpected
}
}()
result := getRuntime(tt.resourceScopeSpan)
result := getRuntime(nil, tt.resourceScopeSpan)
tt.validate(t, result)
})
}
Expand Down Expand Up @@ -1149,7 +1149,7 @@ func TestCalRuntime(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
calRuntime(tt.systemTagsString, tt.resourceScopeSpan)
calRuntime(tt.systemTagsString, nil, tt.resourceScopeSpan)
tt.validate(t, tt.systemTagsString)
})
}
Expand Down
Loading