Skip to content

Commit c584743

Browse files
kidkidkidCoda-bot
andauthored
[feat][backend] ob add manual annotation (#194)
feat(backend): add annotation feature support with ClickHouse table and DAO implementation Co-authored-by: Coda <coda@bytedance.com>
1 parent 0ed1e00 commit c584743

10 files changed

Lines changed: 1082 additions & 132 deletions

File tree

backend/modules/observability/infra/repo/ck/annotation.go

Lines changed: 145 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,19 @@ package ck
55

66
import (
77
"context"
8+
"fmt"
9+
"strings"
810

11+
"github.com/coze-dev/coze-loop/backend/infra/backoff"
912
"github.com/coze-dev/coze-loop/backend/infra/ck"
13+
"github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/convertor"
14+
"github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/ck/gorm_gen/model"
1015
"github.com/coze-dev/coze-loop/backend/modules/observability/infra/repo/dao"
16+
obErrorx "github.com/coze-dev/coze-loop/backend/modules/observability/pkg/errno"
17+
"github.com/coze-dev/coze-loop/backend/pkg/errorx"
18+
"github.com/coze-dev/coze-loop/backend/pkg/logs"
19+
"gorm.io/gorm"
20+
"gorm.io/gorm/clause"
1121
)
1222

1323
func NewAnnotationCkDaoImpl(db ck.Provider) (dao.IAnnotationDao, error) {
@@ -21,13 +31,146 @@ type AnnotationCkDaoImpl struct {
2131
}
2232

2333
func (a *AnnotationCkDaoImpl) Insert(ctx context.Context, params *dao.InsertAnnotationParam) error {
34+
if params == nil || len(params.Annotations) == 0 {
35+
return errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode)
36+
}
37+
db := a.db.NewSession(ctx)
38+
annotations := convertor.AnnotationListPO2CKModels(params.Annotations)
39+
if err := backoff.RetryWithMaxTimes(ctx, 2, func() error {
40+
return db.Table(params.Table).Create(annotations).Error
41+
}); err != nil {
42+
logs.CtxError(ctx, "fail to insert annotations: %v", err)
43+
return errorx.WrapByCode(err, obErrorx.CommercialCommonInternalErrorCodeCode)
44+
}
45+
logs.CtxInfo(ctx, "insert annotations successfully, count: %d", len(params.Annotations))
2446
return nil
2547
}
2648

2749
func (a *AnnotationCkDaoImpl) Get(ctx context.Context, params *dao.GetAnnotationParam) (*dao.Annotation, error) {
28-
return nil, nil
50+
if params == nil || params.ID == "" {
51+
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode)
52+
}
53+
db, err := a.buildSql(ctx, &annoSqlParam{
54+
Tables: params.Tables,
55+
StartTime: params.StartTime,
56+
EndTime: params.EndTime,
57+
ID: params.ID,
58+
Limit: 1,
59+
})
60+
if err != nil {
61+
return nil, err
62+
}
63+
logs.CtxInfo(ctx, "Get Annotation SQL: %s", db.ToSQL(func(tx *gorm.DB) *gorm.DB {
64+
return tx.Find(nil)
65+
}))
66+
var annotations []*model.ObservabilityAnnotation
67+
if err := db.Find(&annotations).Error; err != nil {
68+
return nil, err
69+
}
70+
if len(annotations) == 0 {
71+
return nil, nil
72+
} else if len(annotations) > 1 {
73+
logs.CtxWarn(ctx, "multiple annotations found")
74+
}
75+
return convertor.AnnotationCKModel2PO(annotations[0]), nil
2976
}
3077

3178
func (a *AnnotationCkDaoImpl) List(ctx context.Context, params *dao.ListAnnotationsParam) ([]*dao.Annotation, error) {
32-
return nil, nil
79+
if params == nil || len(params.SpanIDs) == 0 {
80+
return nil, nil
81+
}
82+
db, err := a.buildSql(ctx, &annoSqlParam{
83+
Tables: params.Tables,
84+
StartTime: params.StartTime,
85+
EndTime: params.EndTime,
86+
SpanIDs: params.SpanIDs,
87+
DescByUpdatedAt: params.DescByUpdatedAt,
88+
Limit: params.Limit,
89+
})
90+
if err != nil {
91+
return nil, err
92+
}
93+
logs.CtxInfo(ctx, "List Annotations SQL: %s", db.ToSQL(func(tx *gorm.DB) *gorm.DB {
94+
return tx.Find(nil)
95+
}))
96+
var annotations []*model.ObservabilityAnnotation
97+
if err := db.Find(&annotations).Error; err != nil {
98+
return nil, err
99+
}
100+
return convertor.AnnotationListCKModels2PO(annotations), nil
101+
}
102+
103+
type annoSqlParam struct {
104+
Tables []string
105+
StartTime int64
106+
EndTime int64
107+
ID string
108+
SpanIDs []string
109+
DescByUpdatedAt bool
110+
Limit int32
111+
}
112+
113+
func (a *AnnotationCkDaoImpl) buildSql(ctx context.Context, param *annoSqlParam) (*gorm.DB, error) {
114+
db := a.db.NewSession(ctx)
115+
var tableQueries []*gorm.DB
116+
for _, table := range param.Tables {
117+
query, err := a.buildSingleSql(ctx, db, table, param)
118+
if err != nil {
119+
return nil, err
120+
}
121+
tableQueries = append(tableQueries, query)
122+
}
123+
if len(tableQueries) == 0 {
124+
return nil, fmt.Errorf("no table configured")
125+
} else if len(tableQueries) == 1 {
126+
query := tableQueries[0].ToSQL(func(tx *gorm.DB) *gorm.DB {
127+
return tx.Find(nil)
128+
})
129+
query += " SETTINGS final = 1"
130+
return db.Raw(query), nil
131+
} else {
132+
queries := make([]string, 0)
133+
for i := 0; i < len(tableQueries); i++ {
134+
query := tableQueries[i].ToSQL(func(tx *gorm.DB) *gorm.DB {
135+
return tx.Find(nil)
136+
})
137+
queries = append(queries, "("+query+")")
138+
}
139+
sql := fmt.Sprintf("SELECT * FROM (%s)", strings.Join(queries, " UNION ALL "))
140+
if param.DescByUpdatedAt {
141+
sql += " ORDER BY updated_at DESC"
142+
} else {
143+
sql += " ORDER BY created_at ASC"
144+
}
145+
sql += fmt.Sprintf(" LIMIT %d SETTINGS final = 1", param.Limit)
146+
return db.Raw(sql), nil
147+
}
148+
}
149+
150+
// buildSingleSql 构建单表查询SQL
151+
func (a *AnnotationCkDaoImpl) buildSingleSql(ctx context.Context, db *gorm.DB, tableName string, param *annoSqlParam) (*gorm.DB, error) {
152+
sqlQuery := db.
153+
Table(tableName).
154+
Where("deleted_at = 0")
155+
156+
if param.ID != "" {
157+
sqlQuery = sqlQuery.Where("id = ?", param.ID)
158+
}
159+
if len(param.SpanIDs) > 0 {
160+
sqlQuery = sqlQuery.Where("span_id IN (?)", param.SpanIDs)
161+
}
162+
sqlQuery = sqlQuery.
163+
Where("start_time >= ?", param.StartTime).
164+
Where("start_time <= ?", param.EndTime)
165+
if param.DescByUpdatedAt {
166+
sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{
167+
{Column: clause.Column{Name: "updated_at"}, Desc: true},
168+
}})
169+
} else {
170+
sqlQuery = sqlQuery.Order(clause.OrderBy{Columns: []clause.OrderByColumn{
171+
{Column: clause.Column{Name: "created_at"}, Desc: false},
172+
}})
173+
}
174+
sqlQuery = sqlQuery.Limit(int(param.Limit))
175+
return sqlQuery, nil
33176
}

0 commit comments

Comments
 (0)