From bfc91685405a0552b6f95cd968520e108ac667cc Mon Sep 17 00:00:00 2001 From: lizhanpeng Date: Wed, 10 Sep 2025 16:48:17 +0800 Subject: [PATCH] feat(backend): add annotation feature support with ClickHouse table and DAO implementation Co-Authored-By: Coda --- .../observability/infra/repo/ck/annotation.go | 147 ++++++- .../infra/repo/ck/annotation_test.go | 364 ++++++++++++++++++ .../infra/repo/ck/convertor/annotation.go | 86 +++++ .../model/observability_annotation.gen.go | 2 +- .../observability/infra/repo/ck/spans.go | 331 +++++++++++----- .../observability/infra/repo/ck/spans_test.go | 185 ++++++++- .../init-sql/observability_annotations.sql | 32 ++ .../docker-compose/conf/observability.yaml | 12 +- .../init-sql/observability_annotations.sql | 32 ++ .../umbrella/conf/observability.yaml | 23 +- 10 files changed, 1082 insertions(+), 132 deletions(-) create mode 100755 backend/modules/observability/infra/repo/ck/annotation_test.go create mode 100644 backend/modules/observability/infra/repo/ck/convertor/annotation.go create mode 100755 release/deployment/docker-compose/bootstrap/clickhouse-init/init-sql/observability_annotations.sql create mode 100755 release/deployment/helm-chart/charts/app/bootstrap/init/clickhouse/init-sql/observability_annotations.sql diff --git a/backend/modules/observability/infra/repo/ck/annotation.go b/backend/modules/observability/infra/repo/ck/annotation.go index d2018e9975..a9cd3f8523 100644 --- a/backend/modules/observability/infra/repo/ck/annotation.go +++ b/backend/modules/observability/infra/repo/ck/annotation.go @@ -5,9 +5,19 @@ package ck import ( "context" + "fmt" + "strings" + "github.com/coze-dev/coze-loop/backend/infra/backoff" "github.com/coze-dev/coze-loop/backend/infra/ck" + "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/convertor" + "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/gorm_gen/model" "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/dao" + obErrorx "github.com/coze-dev/coze-loop/backend/modules/observability/pkg/errno" + "github.com/coze-dev/coze-loop/backend/pkg/errorx" + "github.com/coze-dev/coze-loop/backend/pkg/logs" + "gorm.io/gorm" + "gorm.io/gorm/clause" ) func NewAnnotationCkDaoImpl(db ck.Provider) (dao.IAnnotationDao, error) { @@ -21,13 +31,146 @@ type AnnotationCkDaoImpl struct { } func (a *AnnotationCkDaoImpl) Insert(ctx context.Context, params *dao.InsertAnnotationParam) error { + if params == nil || len(params.Annotations) == 0 { + return errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode) + } + db := a.db.NewSession(ctx) + annotations := convertor.AnnotationListPO2CKModels(params.Annotations) + if err := backoff.RetryWithMaxTimes(ctx, 2, func() error { + return db.Table(params.Table).Create(annotations).Error + }); err != nil { + logs.CtxError(ctx, "fail to insert annotations: %v", err) + return errorx.WrapByCode(err, obErrorx.CommercialCommonInternalErrorCodeCode) + } + logs.CtxInfo(ctx, "insert annotations successfully, count: %d", len(params.Annotations)) return nil } func (a *AnnotationCkDaoImpl) Get(ctx context.Context, params *dao.GetAnnotationParam) (*dao.Annotation, error) { - return nil, nil + if params == nil || params.ID == "" { + return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode) + } + db, err := a.buildSql(ctx, &annoSqlParam{ + Tables: params.Tables, + StartTime: params.StartTime, + EndTime: params.EndTime, + ID: params.ID, + Limit: 1, + }) + if err != nil { + return nil, err + } + logs.CtxInfo(ctx, "Get Annotation SQL: %s", db.ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + })) + var annotations []*model.ObservabilityAnnotation + if err := db.Find(&annotations).Error; err != nil { + return nil, err + } + if len(annotations) == 0 { + return nil, nil + } else if len(annotations) > 1 { + logs.CtxWarn(ctx, "multiple annotations found") + } + return convertor.AnnotationCKModel2PO(annotations[0]), nil } func (a *AnnotationCkDaoImpl) List(ctx context.Context, params *dao.ListAnnotationsParam) ([]*dao.Annotation, error) { - return nil, nil + if params == nil || len(params.SpanIDs) == 0 { + return nil, nil + } + db, err := a.buildSql(ctx, &annoSqlParam{ + Tables: params.Tables, + StartTime: params.StartTime, + EndTime: params.EndTime, + SpanIDs: params.SpanIDs, + DescByUpdatedAt: params.DescByUpdatedAt, + Limit: params.Limit, + }) + if err != nil { + return nil, err + } + logs.CtxInfo(ctx, "List Annotations SQL: %s", db.ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + })) + var annotations []*model.ObservabilityAnnotation + if err := db.Find(&annotations).Error; err != nil { + return nil, err + } + return convertor.AnnotationListCKModels2PO(annotations), nil +} + +type annoSqlParam struct { + Tables []string + StartTime int64 + EndTime int64 + ID string + SpanIDs []string + DescByUpdatedAt bool + Limit int32 +} + +func (a *AnnotationCkDaoImpl) buildSql(ctx context.Context, param *annoSqlParam) (*gorm.DB, error) { + db := a.db.NewSession(ctx) + var tableQueries []*gorm.DB + for _, table := range param.Tables { + query, err := a.buildSingleSql(ctx, db, table, param) + if err != nil { + return nil, err + } + tableQueries = append(tableQueries, query) + } + if len(tableQueries) == 0 { + return nil, fmt.Errorf("no table configured") + } else if len(tableQueries) == 1 { + query := tableQueries[0].ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + }) + query += " SETTINGS final = 1" + return db.Raw(query), nil + } else { + queries := make([]string, 0) + for i := 0; i < len(tableQueries); i++ { + query := tableQueries[i].ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + }) + queries = append(queries, "("+query+")") + } + sql := fmt.Sprintf("SELECT * FROM (%s)", strings.Join(queries, " UNION ALL ")) + if param.DescByUpdatedAt { + sql += " ORDER BY updated_at DESC" + } else { + sql += " ORDER BY created_at ASC" + } + sql += fmt.Sprintf(" LIMIT %d SETTINGS final = 1", param.Limit) + return db.Raw(sql), nil + } +} + +// buildSingleSql 构建单表查询SQL +func (a *AnnotationCkDaoImpl) buildSingleSql(ctx context.Context, db *gorm.DB, tableName string, param *annoSqlParam) (*gorm.DB, error) { + sqlQuery := db. + Table(tableName). + Where("deleted_at = 0") + + if param.ID != "" { + sqlQuery = sqlQuery.Where("id = ?", param.ID) + } + if len(param.SpanIDs) > 0 { + sqlQuery = sqlQuery.Where("span_id IN (?)", param.SpanIDs) + } + sqlQuery = sqlQuery. + Where("start_time >= ?", param.StartTime). + Where("start_time <= ?", param.EndTime) + if param.DescByUpdatedAt { + sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{ + {Column: clause.Column{Name: "updated_at"}, Desc: true}, + }}) + } else { + sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{ + {Column: clause.Column{Name: "created_at"}, Desc: false}, + }}) + } + sqlQuery = sqlQuery.Limit(int(param.Limit)) + return sqlQuery, nil } diff --git a/backend/modules/observability/infra/repo/ck/annotation_test.go b/backend/modules/observability/infra/repo/ck/annotation_test.go new file mode 100755 index 0000000000..1311067490 --- /dev/null +++ b/backend/modules/observability/infra/repo/ck/annotation_test.go @@ -0,0 +1,364 @@ +// Copyright (c) 2025 coze-dev Authors +// SPDX-License-Identifier: Apache-2.0 + +package ck + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/gorm_gen/model" + repodao "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/dao" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/clickhouse" + "gorm.io/gorm" +) + +func newInsertAnnotationDao(t *testing.T, failUntil int) (*AnnotationCkDaoImpl, func(), *int) { + t.Helper() + sqlDB, _, err := sqlmock.New() + require.NoError(t, err) + + db, err := gorm.Open(clickhouse.New(clickhouse.Config{ + Conn: sqlDB, + SkipInitializeWithVersion: true, + }), &gorm.Config{SkipDefaultTransaction: true}) + require.NoError(t, err) + + count := 0 + _ = db.Callback().Create().Replace("gorm:create", func(tx *gorm.DB) { + count++ + if count <= failUntil { + tx.Error = errors.New("insert error") + return + } + }) + + provider := &mockCkProvider{db: db.Session(&gorm.Session{DryRun: true})} + return &AnnotationCkDaoImpl{db: provider}, func() { + _ = sqlDB.Close() + }, &count +} + +func newAnnotationDao(t *testing.T) (*AnnotationCkDaoImpl, sqlmock.Sqlmock, *gorm.DB, func()) { + t.Helper() + sqlDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + require.NoError(t, err) + + db, err := gorm.Open(clickhouse.New(clickhouse.Config{ + Conn: sqlDB, + SkipInitializeWithVersion: true, + }), &gorm.Config{SkipDefaultTransaction: true}) + require.NoError(t, err) + + provider := &mockCkProvider{db: db} + return &AnnotationCkDaoImpl{db: provider}, mock, db, func() { + _ = sqlDB.Close() + } +} + +func TestAnnotationCkDaoImpl_Insert(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("nil params", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + err := dao.Insert(ctx, nil) + assert.Error(t, err) + }) + + t.Run("empty annotations", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + err := dao.Insert(ctx, &repodao.InsertAnnotationParam{}) + assert.Error(t, err) + }) + + t.Run("success", func(t *testing.T) { + dao, cleanup, calls := newInsertAnnotationDao(t, 0) + defer cleanup() + annotation := &repodao.Annotation{ID: "anno-1"} + + err := dao.Insert(ctx, &repodao.InsertAnnotationParam{ + Table: "observability_annotations", + Annotations: []*repodao.Annotation{annotation}, + }) + assert.NoError(t, err) + assert.Equal(t, 1, *calls) + }) + + t.Run("retry failed", func(t *testing.T) { + dao, cleanup, calls := newInsertAnnotationDao(t, 3) + defer cleanup() + annotation := &repodao.Annotation{ID: "anno-2"} + + err := dao.Insert(ctx, &repodao.InsertAnnotationParam{ + Table: "observability_annotations", + Annotations: []*repodao.Annotation{annotation}, + }) + assert.Error(t, err) + assert.Equal(t, 3, *calls) + }) +} + +func TestAnnotationCkDaoImpl_Get(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("invalid params", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + _, err := dao.Get(ctx, &repodao.GetAnnotationParam{}) + assert.Error(t, err) + }) + + t.Run("build sql error", func(t *testing.T) { + dao, _, _, cleanup := newAnnotationDao(t) + defer cleanup() + + _, err := dao.Get(ctx, &repodao.GetAnnotationParam{ + ID: "anno-1", + StartTime: 1, + EndTime: 2, + }) + assert.Error(t, err) + }) + + t.Run("success", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + rows := sqlmock.NewRows([]string{"id", "span_id"}).AddRow("anno-1", "span-1") + mock.ExpectQuery("SELECT").WillReturnRows(rows) + + anno, err := dao.Get(ctx, &repodao.GetAnnotationParam{ + ID: "anno-1", + Tables: []string{"observability_annotations"}, + StartTime: 1, + EndTime: 2, + }) + assert.NoError(t, err) + assert.NotNil(t, anno) + assert.Equal(t, "anno-1", anno.ID) + assert.Equal(t, "span-1", anno.SpanID) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("multiple results returns first", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + rows := sqlmock.NewRows([]string{"id", "span_id"}). + AddRow("anno-1", "span-1"). + AddRow("anno-2", "span-2") + mock.ExpectQuery("SELECT").WillReturnRows(rows) + + anno, err := dao.Get(ctx, &repodao.GetAnnotationParam{ + ID: "anno-1", + Tables: []string{"observability_annotations"}, + StartTime: 1, + EndTime: 2, + }) + assert.NoError(t, err) + assert.Equal(t, "anno-1", anno.ID) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("database error", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + mock.ExpectQuery("SELECT").WillReturnError(fmt.Errorf("db error")) + + _, err := dao.Get(ctx, &repodao.GetAnnotationParam{ + ID: "anno-1", + Tables: []string{"observability_annotations"}, + StartTime: 1, + EndTime: 2, + }) + assert.Error(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) + }) +} + +func TestAnnotationCkDaoImpl_List(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("nil params", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + annos, err := dao.List(ctx, nil) + assert.NoError(t, err) + assert.Nil(t, annos) + }) + + t.Run("empty span ids", func(t *testing.T) { + dao := &AnnotationCkDaoImpl{} + annos, err := dao.List(ctx, &repodao.ListAnnotationsParam{}) + assert.NoError(t, err) + assert.Nil(t, annos) + }) + + t.Run("success", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + rows := sqlmock.NewRows([]string{"id", "span_id"}).AddRow("anno-1", "span-1") + mock.ExpectQuery("SELECT").WillReturnRows(rows) + + annos, err := dao.List(ctx, &repodao.ListAnnotationsParam{ + Tables: []string{"observability_annotations"}, + SpanIDs: []string{"span-1"}, + StartTime: 1, + EndTime: 2, + Limit: 10, + }) + assert.NoError(t, err) + require.Len(t, annos, 1) + assert.Equal(t, "span-1", annos[0].SpanID) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("database error", func(t *testing.T) { + dao, mock, _, cleanup := newAnnotationDao(t) + defer cleanup() + + mock.ExpectQuery("SELECT").WillReturnError(fmt.Errorf("db error")) + + _, err := dao.List(ctx, &repodao.ListAnnotationsParam{ + Tables: []string{"observability_annotations"}, + SpanIDs: []string{"span-1"}, + StartTime: 1, + EndTime: 2, + Limit: 10, + }) + assert.Error(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("build sql error", func(t *testing.T) { + dao, _, _, cleanup := newAnnotationDao(t) + defer cleanup() + + _, err := dao.List(ctx, &repodao.ListAnnotationsParam{ + SpanIDs: []string{"span-1"}, + StartTime: 1, + EndTime: 2, + }) + assert.Error(t, err) + }) +} + +func TestAnnotationCkDaoImpl_buildSingleSql(t *testing.T) { + t.Parallel() + + dao, _, db, cleanup := newAnnotationDao(t) + defer cleanup() + + ctx := context.Background() + baseSession := dao.db.NewSession(ctx) + require.NotNil(t, baseSession) + + testCases := []struct { + name string + param *annoSqlParam + assert func(t *testing.T, sql string) + }{ + { + name: "with id filter", + param: &annoSqlParam{ + Tables: []string{"observability_annotations"}, + ID: "anno-1", + StartTime: 1, + EndTime: 2, + Limit: 10, + }, + assert: func(t *testing.T, sql string) { + assert.Contains(t, sql, "FROM `observability_annotations`") + assert.Contains(t, sql, "id = 'anno-1'") + assert.Contains(t, sql, "ORDER BY `created_at`") + assert.Contains(t, sql, "LIMIT 10") + }, + }, + { + name: "with span ids and desc", + param: &annoSqlParam{ + Tables: []string{"observability_annotations"}, + SpanIDs: []string{"span-1"}, + StartTime: 10, + EndTime: 20, + Limit: 5, + DescByUpdatedAt: true, + }, + assert: func(t *testing.T, sql string) { + assert.Contains(t, sql, "span_id IN ('span-1')") + assert.Contains(t, sql, "ORDER BY `updated_at` DESC") + assert.Contains(t, sql, "LIMIT 5") + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + session := baseSession.Session(&gorm.Session{DryRun: true}) + query, err := dao.buildSingleSql(ctx, session, tc.param.Tables[0], tc.param) + require.NoError(t, err) + sql := query.ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find([]*model.ObservabilityAnnotation{}) + }) + tc.assert(t, sql) + }) + } + + _ = db // silence unused (db kept for completeness) +} + +func TestAnnotationCkDaoImpl_buildSql(t *testing.T) { + t.Parallel() + + dao, _, _, cleanup := newAnnotationDao(t) + defer cleanup() + + ctx := context.Background() + + t.Run("no tables", func(t *testing.T) { + _, err := dao.buildSql(ctx, &annoSqlParam{}) + assert.Error(t, err) + }) + + t.Run("single table", func(t *testing.T) { + result, err := dao.buildSql(ctx, &annoSqlParam{ + Tables: []string{"observability_annotations"}, + StartTime: 1, + EndTime: 2, + Limit: 3, + }) + assert.NoError(t, err) + sql := result.Statement.SQL.String() + assert.Contains(t, sql, "FROM `observability_annotations`") + assert.Contains(t, sql, "LIMIT 3") + assert.Contains(t, sql, "SETTINGS final = 1") + }) + + t.Run("multiple tables", func(t *testing.T) { + result, err := dao.buildSql(ctx, &annoSqlParam{ + Tables: []string{"observability_annotations", "observability_annotations_v2"}, + StartTime: 1, + EndTime: 2, + Limit: 5, + DescByUpdatedAt: true, + }) + assert.NoError(t, err) + sql := result.Statement.SQL.String() + assert.Contains(t, sql, "UNION ALL") + assert.Contains(t, sql, "ORDER BY updated_at DESC") + assert.Contains(t, sql, "LIMIT 5") + assert.Contains(t, sql, "SETTINGS final = 1") + }) +} diff --git a/backend/modules/observability/infra/repo/ck/convertor/annotation.go b/backend/modules/observability/infra/repo/ck/convertor/annotation.go new file mode 100644 index 0000000000..9132a5201e --- /dev/null +++ b/backend/modules/observability/infra/repo/ck/convertor/annotation.go @@ -0,0 +1,86 @@ +// Copyright (c) 2025 coze-dev Authors +// SPDX-License-Identifier: Apache-2.0 +package convertor + +import ( + "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/gorm_gen/model" + "github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/dao" +) + +func AnnotationListCKModels2PO(annotations []*model.ObservabilityAnnotation) []*dao.Annotation { + ret := make([]*dao.Annotation, len(annotations)) + for i, annotation := range annotations { + ret[i] = AnnotationCKModel2PO(annotation) + } + return ret +} + +func AnnotationListPO2CKModels(annotations []*dao.Annotation) []*model.ObservabilityAnnotation { + ret := make([]*model.ObservabilityAnnotation, len(annotations)) + for i, annotation := range annotations { + ret[i] = AnnotationPO2CKModel(annotation) + } + return ret +} + +func AnnotationPO2CKModel(anno *dao.Annotation) *model.ObservabilityAnnotation { + if anno == nil { + return nil + } + return &model.ObservabilityAnnotation{ + ID: anno.ID, + SpanID: anno.SpanID, + TraceID: anno.TraceID, + StartTime: anno.StartTime, + SpaceID: anno.SpaceID, + AnnotationType: anno.AnnotationType, + AnnotationIndex: anno.AnnotationIndex, + Key: anno.Key, + ValueType: anno.ValueType, + ValueFloat: anno.ValueFloat, + ValueString: anno.ValueString, + ValueBool: anno.ValueBool, + ValueLong: anno.ValueLong, + Reasoning: anno.Reasoning, + Correction: anno.Correction, + Metadata: anno.Metadata, + Status: anno.Status, + CreatedBy: anno.CreatedBy, + CreatedAt: anno.CreatedAt, + UpdatedBy: anno.UpdatedBy, + UpdatedAt: anno.UpdatedAt, + DeletedAt: anno.DeletedAt, + StartDate: anno.StartDate, + } +} + +func AnnotationCKModel2PO(anno *model.ObservabilityAnnotation) *dao.Annotation { + if anno == nil { + return nil + } + return &dao.Annotation{ + ID: anno.ID, + SpanID: anno.SpanID, + TraceID: anno.TraceID, + StartTime: anno.StartTime, + SpaceID: anno.SpaceID, + AnnotationType: anno.AnnotationType, + AnnotationIndex: anno.AnnotationIndex, + Key: anno.Key, + ValueType: anno.ValueType, + ValueFloat: anno.ValueFloat, + ValueString: anno.ValueString, + ValueBool: anno.ValueBool, + ValueLong: anno.ValueLong, + Reasoning: anno.Reasoning, + Correction: anno.Correction, + Metadata: anno.Metadata, + Status: anno.Status, + CreatedBy: anno.CreatedBy, + CreatedAt: anno.CreatedAt, + UpdatedBy: anno.UpdatedBy, + UpdatedAt: anno.UpdatedAt, + DeletedAt: anno.DeletedAt, + StartDate: anno.StartDate, + } +} diff --git a/backend/modules/observability/infra/repo/ck/gorm_gen/model/observability_annotation.gen.go b/backend/modules/observability/infra/repo/ck/gorm_gen/model/observability_annotation.gen.go index ca7fd9a34d..3c4d914e30 100644 --- a/backend/modules/observability/infra/repo/ck/gorm_gen/model/observability_annotation.gen.go +++ b/backend/modules/observability/infra/repo/ck/gorm_gen/model/observability_annotation.gen.go @@ -4,7 +4,7 @@ package model -const TableNameAnnotation = "observability_annotation" +const TableNameAnnotation = "observability_annotations" // ObservabilityAnnotation mapped from table type ObservabilityAnnotation struct { diff --git a/backend/modules/observability/infra/repo/ck/spans.go b/backend/modules/observability/infra/repo/ck/spans.go index 49fdda04ae..da6e1aaabe 100644 --- a/backend/modules/observability/infra/repo/ck/spans.go +++ b/backend/modules/observability/infra/repo/ck/spans.go @@ -27,6 +27,12 @@ import ( const ( TraceStorageTypeCK = "ck" + + // 人工标注标签 + AnnotationManualFeedbackFieldPrefix = "manual_feedback_" + + // 人工标注标签类型 + AnnotationManualFeedbackType = "manual_feedback" ) func NewSpansCkDaoImpl(db ck.Provider) (dao.ISpansDao, error) { @@ -74,6 +80,12 @@ func (s *SpansCkDaoImpl) Get(ctx context.Context, param *dao.QueryParam) ([]*dao if err := sql.Find(&spans).Error; err != nil { return nil, errorx.WrapByCode(err, obErrorx.CommercialCommonRPCErrorCodeCode) } + for _, span := range spans { + if span.SystemTagsString == nil { + span.SystemTagsString = make(map[string]string) + } + span.SystemTagsString[loop_span.SpanFieldTenant] = "cozeloop" // tenant + } return convertor.SpanListCKModels2PO(spans), nil } @@ -170,11 +182,27 @@ func (s *SpansCkDaoImpl) formatAggregationExpression(ctx context.Context, dimens return fmt.Sprintf(dimension.Expression.Expression, replacements...), nil } +type buildSqlParam struct { + spanTable string + annoTable string + queryParam *dao.QueryParam + db *gorm.DB + selectColumns []string + omitColumns []string +} + func (s *SpansCkDaoImpl) buildSql(ctx context.Context, param *dao.QueryParam) (*gorm.DB, error) { db := s.newSession(ctx) var tableQueries []*gorm.DB for _, table := range param.Tables { - query, err := s.buildSingleSql(ctx, db, table, param) + query, err := s.buildSingleSql(ctx, &buildSqlParam{ + spanTable: table, + annoTable: param.AnnoTableMap[table], + queryParam: param, + db: db, + selectColumns: param.SelectColumns, + omitColumns: param.OmitColumns, + }) if err != nil { return nil, err } @@ -203,43 +231,43 @@ func (s *SpansCkDaoImpl) buildSql(ctx context.Context, param *dao.QueryParam) (* } } -func (s *SpansCkDaoImpl) buildSingleSql(ctx context.Context, db *gorm.DB, tableName string, param *dao.QueryParam) (*gorm.DB, error) { - sqlQuery, err := s.buildSqlForFilterFields(ctx, db, param.Filters) +func (s *SpansCkDaoImpl) buildSingleSql(ctx context.Context, param *buildSqlParam) (*gorm.DB, error) { + sqlQuery, err := s.buildSqlForFilterFields(ctx, param, param.queryParam.Filters) if err != nil { return nil, err } queryColumns := lo.Ternary( - len(param.SelectColumns) == 0, - getColumnStr(spanColumns, param.OmitColumns), - getColumnStr(param.SelectColumns, param.OmitColumns), + len(param.selectColumns) == 0, + getColumnStr(spanColumns, param.omitColumns), + getColumnStr(param.selectColumns, param.omitColumns), ) - sqlQuery = db. - Table(tableName).Select(queryColumns). + sqlQuery = param.db. + Table(param.spanTable).Select(queryColumns). Where(sqlQuery). - Where("start_time >= ?", param.StartTime). - Where("start_time <= ?", param.EndTime) - if param.OrderByStartTime { + Where("start_time >= ?", param.queryParam.StartTime). + Where("start_time <= ?", param.queryParam.EndTime) + if param.queryParam.OrderByStartTime { sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{ {Column: clause.Column{Name: "start_time"}, Desc: true}, {Column: clause.Column{Name: "span_id"}, Desc: true}, }}) } - sqlQuery = sqlQuery.Limit(int(param.Limit)) + sqlQuery = sqlQuery.Limit(int(param.queryParam.Limit)) return sqlQuery, nil } // chain -func (s *SpansCkDaoImpl) buildSqlForFilterFields(ctx context.Context, db *gorm.DB, filter *loop_span.FilterFields) (*gorm.DB, error) { +func (s *SpansCkDaoImpl) buildSqlForFilterFields(ctx context.Context, param *buildSqlParam, filter *loop_span.FilterFields) (*gorm.DB, error) { if filter == nil { - return db, nil + return param.db, nil } - queryChain := db + queryChain := param.db if filter.QueryAndOr != nil && *filter.QueryAndOr == loop_span.QueryAndOrEnumOr { for _, subFilter := range filter.FilterFields { if subFilter == nil { continue } - subSql, err := s.buildSqlForFilterField(ctx, db, subFilter) + subSql, err := s.buildSqlForFilterField(ctx, param, subFilter) if err != nil { return nil, err } @@ -250,7 +278,7 @@ func (s *SpansCkDaoImpl) buildSqlForFilterFields(ctx context.Context, db *gorm.D if subFilter == nil { continue } - subSql, err := s.buildSqlForFilterField(ctx, db, subFilter) + subSql, err := s.buildSqlForFilterField(ctx, param, subFilter) if err != nil { return nil, err } @@ -260,89 +288,32 @@ func (s *SpansCkDaoImpl) buildSqlForFilterFields(ctx context.Context, db *gorm.D return queryChain, nil } -func (s *SpansCkDaoImpl) buildSqlForFilterField(ctx context.Context, db *gorm.DB, filter *loop_span.FilterField) (*gorm.DB, error) { - queryChain := db - if filter.FieldName != "" { - if filter.QueryType == nil { - return nil, fmt.Errorf("query type is required, not supposed to be here") +func (s *SpansCkDaoImpl) buildSqlForFilterField(ctx context.Context, param *buildSqlParam, filter *loop_span.FilterField) (*gorm.DB, error) { + queryChain := param.db + if s.isAnnotationFilter(filter.FieldName) { + annoSql, err := s.buildAnnotationSql(ctx, param, filter) + if err != nil { + return nil, fmt.Errorf("failed to build annotation sql: %v", err) } + queryChain = queryChain.Where(annoSql) + } else if filter.FieldName != "" { fieldName, err := s.convertFieldName(ctx, filter) if err != nil { return nil, err } - fieldValues, err := convertFieldValue(filter) + sql, err := s.buildFieldCondition(ctx, param.db, &loop_span.FilterField{ + FieldName: fieldName, + FieldType: filter.FieldType, + Values: filter.Values, + QueryType: filter.QueryType, + }) if err != nil { - return nil, err - } - switch *filter.QueryType { - case loop_span.QueryTypeEnumMatch: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s like ?", fieldName), fmt.Sprintf("%%%v%%", fieldValues[0])) - case loop_span.QueryTypeEnumNotMatch: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s NOT like ?", fieldName), fmt.Sprintf("%%%v%%", fieldValues[0])) - case loop_span.QueryTypeEnumEq: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s = ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumNotEq: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s != ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumLte: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s <= ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumGte: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s >= ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumLt: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s < ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumGt: - if len(fieldValues) != 1 { - return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s > ?", fieldName), fieldValues[0]) - case loop_span.QueryTypeEnumExist: - defaultVal := getFieldDefaultValue(filter) - queryChain = queryChain. - Where(fmt.Sprintf("%s IS NOT NULL", fieldName)). - Where(fmt.Sprintf("%s != ?", fieldName), defaultVal) - case loop_span.QueryTypeEnumNotExist: - defaultVal := getFieldDefaultValue(filter) - queryChain = queryChain. - Where(fmt.Sprintf("%s IS NULL", fieldName)). - Or(fmt.Sprintf("%s = ?", fieldName), defaultVal) - case loop_span.QueryTypeEnumIn: - if len(fieldValues) < 1 { - return nil, fmt.Errorf("filter field %s should have at least one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s IN (?)", fieldName), fieldValues) - case loop_span.QueryTypeEnumNotIn: - if len(fieldValues) < 1 { - return nil, fmt.Errorf("filter field %s should have at least one value", filter.FieldName) - } - queryChain = queryChain.Where(fmt.Sprintf("%s NOT IN (?)", fieldName), fieldValues) - case loop_span.QueryTypeEnumAlwaysTrue: - queryChain = queryChain.Where("1 = 1") - default: - return nil, fmt.Errorf("filter field type %s not supported", filter.FieldType) + return nil, fmt.Errorf("failed to build field condition: %v", err) } + queryChain = queryChain.Where(sql) } if filter.SubFilter != nil { - subQuery, err := s.buildSqlForFilterFields(ctx, db, filter.SubFilter) + subQuery, err := s.buildSqlForFilterFields(ctx, param, filter.SubFilter) if err != nil { return nil, err } @@ -355,6 +326,154 @@ func (s *SpansCkDaoImpl) buildSqlForFilterField(ctx context.Context, db *gorm.DB return queryChain, nil } +func (s *SpansCkDaoImpl) buildFieldCondition(ctx context.Context, db *gorm.DB, filter *loop_span.FilterField) (*gorm.DB, error) { + queryChain := db + if filter.QueryType == nil { + return nil, fmt.Errorf("query type is required, not supposed to be here") + } + fieldValues, err := convertFieldValue(filter) + if err != nil { + return nil, err + } + switch *filter.QueryType { + case loop_span.QueryTypeEnumMatch: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s like ?", filter.FieldName), fmt.Sprintf("%%%v%%", fieldValues[0])) + case loop_span.QueryTypeEnumNotMatch: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s NOT like ?", filter.FieldName), fmt.Sprintf("%%%v%%", fieldValues[0])) + case loop_span.QueryTypeEnumEq: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s = ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumNotEq: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s != ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumLte: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s <= ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumGte: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s >= ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumLt: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s < ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumGt: + if len(fieldValues) != 1 { + return nil, fmt.Errorf("filter field %s should have one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s > ?", filter.FieldName), fieldValues[0]) + case loop_span.QueryTypeEnumExist: + defaultVal := getFieldDefaultValue(filter) + queryChain = queryChain. + Where(fmt.Sprintf("%s IS NOT NULL", filter.FieldName)). + Where(fmt.Sprintf("%s != ?", filter.FieldName), defaultVal) + case loop_span.QueryTypeEnumNotExist: + defaultVal := getFieldDefaultValue(filter) + queryChain = queryChain. + Where(fmt.Sprintf("%s IS NULL", filter.FieldName)). + Or(fmt.Sprintf("%s = ?", filter.FieldName), defaultVal) + case loop_span.QueryTypeEnumIn: + if len(fieldValues) < 1 { + return nil, fmt.Errorf("filter field %s should have at least one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s IN (?)", filter.FieldName), fieldValues) + case loop_span.QueryTypeEnumNotIn: + if len(fieldValues) < 1 { + return nil, fmt.Errorf("filter field %s should have at least one value", filter.FieldName) + } + queryChain = queryChain.Where(fmt.Sprintf("%s NOT IN (?)", filter.FieldName), fieldValues) + case loop_span.QueryTypeEnumAlwaysTrue: + queryChain = queryChain.Where("1 = 1") + default: + return nil, fmt.Errorf("filter field type %s not supported", filter.FieldType) + } + return queryChain, nil +} + +func (s *SpansCkDaoImpl) isAnnotationFilter(fieldName string) bool { + if strings.HasPrefix(fieldName, AnnotationManualFeedbackFieldPrefix) { + return true + } else { + return false + } +} + +func (s *SpansCkDaoImpl) buildAnnotationSql(ctx context.Context, param *buildSqlParam, filter *loop_span.FilterField) (*gorm.DB, error) { + queryChain := param.db + fieldName := filter.FieldName + if strings.HasPrefix(fieldName, AnnotationManualFeedbackFieldPrefix) { + // manual_feedback_{tag_key_id} + tagKeyId := fieldName[len(AnnotationManualFeedbackFieldPrefix):] + if tagKeyId == "" { + return nil, fmt.Errorf("invalid manual feedback field name %s", fieldName) + } + queryChain = queryChain. + Where("annotation_type = ?", AnnotationManualFeedbackType). + Where("key = ?", tagKeyId) + } else { + return nil, fmt.Errorf("field name %s not supported for annotation, not supposed to be here", fieldName) + } + if filter.QueryType != nil && *filter.QueryType != loop_span.QueryTypeEnumExist { + condition := &loop_span.FilterField{ + FieldType: filter.FieldType, + Values: filter.Values, + QueryType: filter.QueryType, + } + switch filter.FieldType { + case loop_span.FieldTypeString: + condition.FieldName = "value_string" + case loop_span.FieldTypeLong: + condition.FieldName = "value_long" + case loop_span.FieldTypeDouble: + condition.FieldName = "value_float" + case loop_span.FieldTypeBool: + condition.FieldName = "value_bool" + default: + return nil, fmt.Errorf("field type %s not supported", filter.FieldType) + } + fieldSql, err := s.buildFieldCondition(ctx, param.db, condition) + if err != nil { + return nil, err + } + queryChain = queryChain.Where(fieldSql) + } + _ = param.queryParam.Filters.Traverse(func(f *loop_span.FilterField) error { + if f.FieldName == loop_span.SpanFieldSpaceId { + commonSql, err := s.buildFieldCondition(ctx, param.db, f) + if err != nil { + return err + } + queryChain = queryChain.Where(commonSql) + } + return nil + }) + subsql := param.db. + Table(param.annoTable). + Select("span_id"). + Where(queryChain). + Where("deleted_at = 0"). + Where("start_time >= ?", param.queryParam.StartTime). + Where("start_time <= ?", param.queryParam.EndTime) + query := subsql.ToSQL(func(tx *gorm.DB) *gorm.DB { + return tx.Find(nil) + }) + return param.db.Where("span_id in (?)", param.db.Raw(query+" SETTINGS final = 1")), nil +} + func (s *SpansCkDaoImpl) getSuperFieldsMap(ctx context.Context) map[string]bool { return defSuperFieldsMap } @@ -396,11 +515,23 @@ func (s *SpansCkDaoImpl) convertFieldName(ctx context.Context, filter *loop_span } switch filter.FieldType { case loop_span.FieldTypeString: - return fmt.Sprintf("tags_string['%s']", filter.FieldName), nil + if filter.IsSystem { + return fmt.Sprintf("system_tags_string['%s']", filter.FieldName), nil + } else { + return fmt.Sprintf("tags_string['%s']", filter.FieldName), nil + } case loop_span.FieldTypeLong: - return fmt.Sprintf("tags_long['%s']", filter.FieldName), nil + if filter.IsSystem { + return fmt.Sprintf("system_tags_long['%s']", filter.FieldName), nil + } else { + return fmt.Sprintf("tags_long['%s']", filter.FieldName), nil + } case loop_span.FieldTypeDouble: - return fmt.Sprintf("tags_float['%s']", filter.FieldName), nil + if filter.IsSystem { + return fmt.Sprintf("system_tags_double['%s']", filter.FieldName), nil + } else { + return fmt.Sprintf("tags_float['%s']", filter.FieldName), nil + } case loop_span.FieldTypeBool: return fmt.Sprintf("tags_bool['%s']", filter.FieldName), nil default: // not expected to be here @@ -516,6 +647,12 @@ var spanColumns = []string{ "logic_delete_date", } +var validColumnRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_.]*$`) + +func isSafeColumnName(name string) bool { + return validColumnRegex.MatchString(name) +} + var defSuperFieldsMap = map[string]bool{ loop_span.SpanFieldStartTime: true, loop_span.SpanFieldSpanId: true, @@ -536,12 +673,6 @@ var defSuperFieldsMap = map[string]bool{ loop_span.SpanFieldLogicDeleteDate: true, } -var validColumnRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_.]*$`) - -func isSafeColumnName(name string) bool { - return validColumnRegex.MatchString(name) -} - func getTimeInterval(granularity metrics_entity.MetricGranularity) string { switch granularity { case metrics_entity.MetricGranularity1Min: diff --git a/backend/modules/observability/infra/repo/ck/spans_test.go b/backend/modules/observability/infra/repo/ck/spans_test.go index 754abc6864..ac147be12e 100644 --- a/backend/modules/observability/infra/repo/ck/spans_test.go +++ b/backend/modules/observability/infra/repo/ck/spans_test.go @@ -302,9 +302,15 @@ func TestBuildSql(t *testing.T) { Values: []string{}, QueryType: ptr.Of(loop_span.QueryTypeEnumNotExist), }, + { + FieldName: "custom_tag_long2", + FieldType: loop_span.FieldTypeLong, + Values: []string{}, + QueryType: ptr.Of(loop_span.QueryTypeEnumExist), + }, }, }, - expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE ((tags_string['custom_tag_string'] IS NULL OR tags_string['custom_tag_string'] = '') AND (tags_bool['custom_tag_bool'] IS NULL OR tags_bool['custom_tag_bool'] = 0) AND (tags_float['custom_tag_double'] IS NULL OR tags_float['custom_tag_double'] = 0) AND (tags_long['custom_tag_long'] IS NULL OR tags_long['custom_tag_long'] = 0)) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE ((tags_string['custom_tag_string'] IS NULL OR tags_string['custom_tag_string'] = '') AND (tags_bool['custom_tag_bool'] IS NULL OR tags_bool['custom_tag_bool'] = 0) AND (tags_float['custom_tag_double'] IS NULL OR tags_float['custom_tag_double'] = 0) AND (tags_long['custom_tag_long'] IS NULL OR tags_long['custom_tag_long'] = 0) AND (tags_long['custom_tag_long2'] IS NOT NULL AND tags_long['custom_tag_long2'] != 0)) AND start_time >= 1 AND start_time <= 2 LIMIT 100", }, { filter: &loop_span.FilterFields{ @@ -345,6 +351,56 @@ func TestBuildSql(t *testing.T) { }, expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `duration` >= 121 AND start_time >= 1 AND start_time <= 2 LIMIT 100", }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: loop_span.SpanFieldDuration, + FieldType: loop_span.FieldTypeLong, + Values: []string{"121"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumGt), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `duration` > 121 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: loop_span.SpanFieldDuration, + FieldType: loop_span.FieldTypeLong, + Values: []string{"121"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumLte), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `duration` <= 121 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: loop_span.SpanFieldDuration, + FieldType: loop_span.FieldTypeLong, + Values: []string{"121"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumLt), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `duration` < 121 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "a", + QueryType: ptr.Of(loop_span.QueryTypeEnumAlwaysTrue), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE 1 = 1 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, { filter: &loop_span.FilterFields{ FilterFields: []*loop_span.FilterField{ @@ -358,6 +414,19 @@ func TestBuildSql(t *testing.T) { }, expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE tags_bool['custom_tag_bool'] = 1 AND start_time >= 1 AND start_time <= 2 LIMIT 100", }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "custom_tag_bool", + FieldType: loop_span.FieldTypeBool, + Values: []string{"true"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumNotEq), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE tags_bool['custom_tag_bool'] != 1 AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, { filter: &loop_span.FilterFields{ FilterFields: []*loop_span.FilterField{ @@ -384,13 +453,89 @@ func TestBuildSql(t *testing.T) { }, expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE `input` NOT like '%123%' AND start_time >= 1 AND start_time <= 2 LIMIT 100", }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeString, + Values: []string{"123"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_string IN ('123')) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeLong, + Values: []string{"123"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_long IN (123)) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeDouble, + Values: []string{"123.1"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_float IN (123.1)) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeBool, + Values: []string{"true"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_bool IN (1)) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, + { + filter: &loop_span.FilterFields{ + FilterFields: []*loop_span.FilterField{ + { + FieldName: "manual_feedback_abc", + FieldType: loop_span.FieldTypeBool, + Values: []string{"true"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + { + FieldName: loop_span.SpanFieldSpaceId, + FieldType: loop_span.FieldTypeString, + Values: []string{"123"}, + QueryType: ptr.Of(loop_span.QueryTypeEnumIn), + }, + }, + }, + expectedSql: "SELECT start_time, logid, span_id, trace_id, parent_id, duration, psm, call_type, space_id, span_type, span_name, method, status_code, input, output, object_storage, system_tags_string, system_tags_long, system_tags_float, tags_string, tags_long, tags_bool, tags_float, tags_byte, reserve_create_time, logic_delete_date FROM `observability_spans` WHERE (span_id in (SELECT span_id FROM `observability_annotations` WHERE (annotation_type = 'manual_feedback' AND key = 'abc' AND value_bool IN (1) AND space_id IN ('123')) AND deleted_at = 0 AND start_time >= 1 AND start_time <= 2 SETTINGS final = 1) AND `space_id` IN ('123')) AND start_time >= 1 AND start_time <= 2 LIMIT 100", + }, } for _, tc := range testCases { - qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), db, "observability_spans", &dao.QueryParam{ - StartTime: 1, - EndTime: 2, - Filters: tc.filter, - Limit: 100, + qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), &buildSqlParam{ + spanTable: "observability_spans", + annoTable: "observability_annotations", + queryParam: &dao.QueryParam{ + StartTime: 1, + EndTime: 2, + Filters: tc.filter, + Limit: 100, + }, + db: db, }) assert.Nil(t, err) sql := qDb.ToSQL(func(tx *gorm.DB) *gorm.DB { @@ -570,11 +715,15 @@ func TestQueryTypeEnumNotMatchSqlExceptionCases(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), db, "observability_spans", &dao.QueryParam{ - StartTime: 1, - EndTime: 2, - Filters: tc.filter, - Limit: 100, + qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), &buildSqlParam{ + spanTable: "observability_spans", + queryParam: &dao.QueryParam{ + StartTime: 1, + EndTime: 2, + Filters: tc.filter, + Limit: 100, + }, + db: db, }) if tc.shouldError { @@ -717,11 +866,15 @@ func TestQueryTypeEnumNotMatchComplexScenarios(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), db, "observability_spans", &dao.QueryParam{ - StartTime: 1, - EndTime: 2, - Filters: tc.filter, - Limit: 100, + qDb, err := new(SpansCkDaoImpl).buildSingleSql(context.Background(), &buildSqlParam{ + spanTable: "observability_spans", + queryParam: &dao.QueryParam{ + StartTime: 1, + EndTime: 2, + Filters: tc.filter, + Limit: 100, + }, + db: db, }) assert.NoError(t, err, "Unexpected error for test case: %s", tc.name) sql := qDb.ToSQL(func(tx *gorm.DB) *gorm.DB { diff --git a/release/deployment/docker-compose/bootstrap/clickhouse-init/init-sql/observability_annotations.sql b/release/deployment/docker-compose/bootstrap/clickhouse-init/init-sql/observability_annotations.sql new file mode 100755 index 0000000000..03645b6ffa --- /dev/null +++ b/release/deployment/docker-compose/bootstrap/clickhouse-init/init-sql/observability_annotations.sql @@ -0,0 +1,32 @@ +CREATE TABLE IF NOT EXISTS `observability_annotations` ( + `id` String, + `span_id` String, + `trace_id` String, + `start_time` Int64, + `space_id` String, + `annotation_type` String, + `annotation_index` Array(String), + `key` String, + `value_type` String, + `value_string` String, + `value_long` Int64, + `value_float` Float64, + `value_bool` Bool, + `reasoning` String, + `correction` String, + `metadata` String, + `status` String, + `created_by` String, + `created_at` UInt64, + `updated_by` String, + `updated_at` UInt64, + `deleted_at` UInt64, + `start_date` Date, + INDEX idx_id id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_span_id span_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_trace_id trace_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_space_id space_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_annotation_type annotation_type TYPE bloom_filter() GRANULARITY 1 +) ENGINE = ReplacingMergeTree(updated_at) PARTITION BY toDate(start_time / 1000000) +PRIMARY KEY (start_time) +ORDER BY (start_time, id); \ No newline at end of file diff --git a/release/deployment/docker-compose/conf/observability.yaml b/release/deployment/docker-compose/conf/observability.yaml index 57f219d28c..0cf7a6ad5f 100644 --- a/release/deployment/docker-compose/conf/observability.yaml +++ b/release/deployment/docker-compose/conf/observability.yaml @@ -87,9 +87,10 @@ trace_tenant_cfg: cozeloop: 365d: span_table: "observability_spans" + anno_table: "observability_annotations" default_ingest_tenant: "cozeloop" tenants_support_annotation: - cozeloop: false + cozeloop: true trace_field_meta_info: available_fields: @@ -203,6 +204,8 @@ trace_field_meta_info: - "exist" - "not_exist" support_custom: true + feedback_manual: + support_custom: true field_metas: default: root_span: @@ -407,3 +410,10 @@ task_mq_consumer_config: topic: "trace_to_task" consumer_group: "trace_to_task_cg" worker_num: 4 + + +annotation_source_cfg: + source_cfg: + default: + tenant: ["cozeloop"] + "annotation_type": "openapi_feedback" diff --git a/release/deployment/helm-chart/charts/app/bootstrap/init/clickhouse/init-sql/observability_annotations.sql b/release/deployment/helm-chart/charts/app/bootstrap/init/clickhouse/init-sql/observability_annotations.sql new file mode 100755 index 0000000000..03645b6ffa --- /dev/null +++ b/release/deployment/helm-chart/charts/app/bootstrap/init/clickhouse/init-sql/observability_annotations.sql @@ -0,0 +1,32 @@ +CREATE TABLE IF NOT EXISTS `observability_annotations` ( + `id` String, + `span_id` String, + `trace_id` String, + `start_time` Int64, + `space_id` String, + `annotation_type` String, + `annotation_index` Array(String), + `key` String, + `value_type` String, + `value_string` String, + `value_long` Int64, + `value_float` Float64, + `value_bool` Bool, + `reasoning` String, + `correction` String, + `metadata` String, + `status` String, + `created_by` String, + `created_at` UInt64, + `updated_by` String, + `updated_at` UInt64, + `deleted_at` UInt64, + `start_date` Date, + INDEX idx_id id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_span_id span_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_trace_id trace_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_space_id space_id TYPE bloom_filter() GRANULARITY 1, + INDEX idx_annotation_type annotation_type TYPE bloom_filter() GRANULARITY 1 +) ENGINE = ReplacingMergeTree(updated_at) PARTITION BY toDate(start_time / 1000000) +PRIMARY KEY (start_time) +ORDER BY (start_time, id); \ No newline at end of file diff --git a/release/deployment/helm-chart/umbrella/conf/observability.yaml b/release/deployment/helm-chart/umbrella/conf/observability.yaml index 092c474416..0cf7a6ad5f 100644 --- a/release/deployment/helm-chart/umbrella/conf/observability.yaml +++ b/release/deployment/helm-chart/umbrella/conf/observability.yaml @@ -87,9 +87,10 @@ trace_tenant_cfg: cozeloop: 365d: span_table: "observability_spans" + anno_table: "observability_annotations" default_ingest_tenant: "cozeloop" tenants_support_annotation: - cozeloop: false + cozeloop: true trace_field_meta_info: available_fields: @@ -203,6 +204,8 @@ trace_field_meta_info: - "exist" - "not_exist" support_custom: true + feedback_manual: + support_custom: true field_metas: default: root_span: @@ -356,17 +359,6 @@ query_trace_rate_limit_config: space_max_qps: 123456: 100 -key_columns: - - "start_time" - - "span_id" - - "parent_id" - - "duration" - - "span_type" - - "span_name" - - "status_code" - - "tags_long" - - "logic_delete_date" - key_span_type: default: - "model" @@ -418,3 +410,10 @@ task_mq_consumer_config: topic: "trace_to_task" consumer_group: "trace_to_task_cg" worker_num: 4 + + +annotation_source_cfg: + source_cfg: + default: + tenant: ["cozeloop"] + "annotation_type": "openapi_feedback"