Skip to content

Commit 410db04

Browse files
committed
feat: get custom metrics
1 parent 8be2c9a commit 410db04

File tree

7 files changed

+107
-17
lines changed

7 files changed

+107
-17
lines changed

server/querier/engine/clickhouse/clickhouse.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"time"
2929

3030
//"github.com/k0kubun/pp"
31+
"github.com/bitly/go-simplejson"
3132
logging "github.com/op/go-logging"
3233
"github.com/xwb1989/sqlparser"
3334

@@ -112,6 +113,7 @@ type CHEngine struct {
112113
ORGID string
113114
Language string
114115
NativeField map[string]*metrics.Metrics
116+
CustomMetrics map[string]*simplejson.Json
115117
}
116118

117119
func init() {
@@ -1329,6 +1331,15 @@ func (e *CHEngine) TransFrom(froms sqlparser.TableExprs) error {
13291331
e.Table = table
13301332
// native field
13311333
if config.ControllerCfg.DFWebService.Enabled && (slices.Contains([]string{chCommon.DB_NAME_DEEPFLOW_ADMIN, chCommon.DB_NAME_DEEPFLOW_TENANT, chCommon.DB_NAME_APPLICATION_LOG, chCommon.DB_NAME_EXT_METRICS}, e.DB) || slices.Contains([]string{chCommon.TABLE_NAME_L7_FLOW_LOG, chCommon.TABLE_NAME_EVENT, chCommon.TABLE_NAME_FILE_EVENT}, e.Table)) {
1334+
// get custom-metrics
1335+
var err error
1336+
if e.CustomMetrics == nil {
1337+
e.CustomMetrics, err = chCommon.GetCustomMetrics(e.ORGID)
1338+
if err != nil {
1339+
log.Error(err.Error())
1340+
}
1341+
}
1342+
customMetrics := e.CustomMetrics
13321343
e.NativeField = map[string]*metrics.Metrics{}
13331344
getNativeUrl := fmt.Sprintf("http://localhost:%d/v1/native-fields/?db=%s&table_name=%s", config.ControllerCfg.ListenPort, e.DB, e.Table)
13341345
resp, err := ctlcommon.CURLPerform("GET", getNativeUrl, nil, ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, e.ORGID))
@@ -1346,9 +1357,18 @@ func (e *CHEngine) TransFrom(froms sqlparser.TableExprs) error {
13461357
continue
13471358
}
13481359
if fieldType == chCommon.NATIVE_FIELD_TYPE_METRIC {
1360+
var mUnit string
1361+
mType := metrics.METRICS_TYPE_COUNTER
1362+
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", e.DB, e.Table, nativeMetric)]
1363+
if ok {
1364+
mType = customMetric.Get("TYPE").MustInt()
1365+
mUnit = customMetric.Get("UNIT").MustString()
1366+
displayName = customMetric.Get("DISPLAY_NAME").MustString()
1367+
description = customMetric.Get("DESCRIPTION").MustString()
1368+
}
13491369
metric := metrics.NewMetrics(
13501370
0, nativeMetric,
1351-
displayName, displayName, displayName, "", "", "", metrics.METRICS_TYPE_COUNTER,
1371+
displayName, displayName, displayName, mUnit, mUnit, mUnit, mType,
13521372
chCommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
13531373
)
13541374
e.NativeField[nativeMetric] = metric
@@ -1872,7 +1892,7 @@ func (e *CHEngine) parseSelectBinaryExpr(node sqlparser.Expr) (binary Function,
18721892
if fieldFunc != nil {
18731893
return fieldFunc, nil
18741894
}
1875-
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
1895+
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
18761896
if ok {
18771897
return &Field{Value: metricStruct.DBField}, nil
18781898
}
@@ -1985,7 +2005,7 @@ func (e *CHEngine) parseWhere(node sqlparser.Expr, w *Where, isCheck bool) (view
19852005
switch comparExpr.(type) {
19862006
case *sqlparser.ColName, *sqlparser.SQLVal:
19872007
whereTag := chCommon.ParseAlias(node.Left)
1988-
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField)
2008+
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
19892009
if ok && metricStruct.Type != metrics.METRICS_TYPE_TAG {
19902010
whereTag = metricStruct.DBField
19912011
}
@@ -2013,7 +2033,7 @@ func (e *CHEngine) parseWhere(node sqlparser.Expr, w *Where, isCheck bool) (view
20132033
return nil, errors.New(fmt.Sprintf("parse where error: %s(%T)", sqlparser.String(node), node))
20142034
}
20152035
whereTag := chCommon.ParseAlias(node.Expr)
2016-
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField)
2036+
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
20172037
if ok && metricStruct.Type != metrics.METRICS_TYPE_TAG {
20182038
whereTag = metricStruct.DBField
20192039
}

server/querier/engine/clickhouse/common/const.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ const (
5757
NATIVE_FIELD_STATE_NORMAL = 1
5858
)
5959

60+
const (
61+
API_CUSTOM_METRICS_FORMAT = "http://localhost:%d/v1/custom-metrics"
62+
)
63+
6064
var DB_TABLE_MAP = map[string][]string{
6165
DB_NAME_FLOW_LOG: []string{"l4_flow_log", "l7_flow_log", "l4_packet", "l7_packet"},
6266
DB_NAME_FLOW_METRICS: []string{"network", "network_map", "application", "application_map", "traffic_policy"},

server/querier/engine/clickhouse/common/utils.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@ import (
2828
"strconv"
2929
"strings"
3030

31+
"github.com/bitly/go-simplejson"
32+
"github.com/xwb1989/sqlparser"
33+
34+
ctlcommon "github.com/deepflowio/deepflow/server/controller/common"
3135
"github.com/deepflowio/deepflow/server/querier/config"
3236
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
3337
logging "github.com/op/go-logging"
34-
"github.com/xwb1989/sqlparser"
3538
)
3639

3740
var log = logging.MustGetLogger("common")
@@ -252,3 +255,25 @@ func GetExtTables(db, where, queryCacheTTL, orgID string, useQueryCache bool, ct
252255
}
253256
return values
254257
}
258+
259+
func GetCustomMetrics(orgID string) (map[string]*simplejson.Json, error) {
260+
result := make(map[string]*simplejson.Json)
261+
url := fmt.Sprintf(API_CUSTOM_METRICS_FORMAT, config.ControllerCfg.ListenPort)
262+
resp, err := ctlcommon.CURLPerform("GET", url, nil,
263+
ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, orgID),
264+
ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_USER_ID, strconv.Itoa(ctlcommon.USER_ID_SUPER_ADMIN)),
265+
ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_USER_TYPE, strconv.Itoa(ctlcommon.USER_TYPE_SUPER_ADMIN)),
266+
)
267+
if err != nil {
268+
return result, fmt.Errorf("request controller url (%s) failed: %s", url, err.Error())
269+
}
270+
resultArray := resp.Get("DATA").MustArray()
271+
for i := range resultArray {
272+
item := resp.Get("DATA").GetIndex(i)
273+
db := item.Get("DB").MustString()
274+
table := item.Get("TABLE").MustString()
275+
name := item.Get("NAME").MustString()
276+
result[fmt.Sprintf("%s.%s.%s", db, table, name)] = item
277+
}
278+
return result, nil
279+
}

server/querier/engine/clickhouse/function.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func GetAggFunc(name string, args []string, alias string, derivativeArgs []strin
107107
if !ok {
108108
return nil, 0, "", nil
109109
}
110-
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
110+
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
111111
if !ok {
112112
return nil, 0, "", nil
113113
}
@@ -275,7 +275,7 @@ func GetTopKTrans(name string, args []string, alias string, e *CHEngine) (Statem
275275
field = field[5 : len(field)-1]
276276
isEnum = true
277277
}
278-
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
278+
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
279279
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
280280
return nil, 0, "", nil
281281
}
@@ -351,7 +351,7 @@ func GetUniqTrans(name string, args []string, alias string, e *CHEngine) (Statem
351351
var metricStruct *metrics.Metrics
352352
for _, field := range fields {
353353
field = strings.Trim(field, "`")
354-
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
354+
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
355355
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
356356
return nil, 0, "", nil
357357
}

server/querier/engine/clickhouse/metrics/ext_common.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,29 @@ func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache
5959
log.Error(err)
6060
return nil, err
6161
}
62+
customMetrics, err := common.GetCustomMetrics(orgID)
63+
if err != nil {
64+
log.Error(err.Error())
65+
}
6266
for i, value := range externalMetricFloatRst.Values {
6367
tagName := value.([]interface{})[0]
6468
tableName := value.([]interface{})[1].(string)
6569
externalTag := tagName.(string)
6670
metrics_names_field, metrics_values_field := METRICS_ARRAY_NAME_MAP[db][0], METRICS_ARRAY_NAME_MAP[db][1]
6771
dbField := fmt.Sprintf("if(indexOf(%s, '%s')=0, null, %s[indexOf(%s, '%s')])", metrics_names_field, externalTag, metrics_values_field, metrics_names_field, externalTag)
6872
metricName := fmt.Sprintf("metrics.%s", externalTag)
73+
var mUnit, description string
74+
mType := METRICS_TYPE_COUNTER
75+
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, tableName, externalTag)]
76+
if ok {
77+
mType = customMetric.Get("TYPE").MustInt()
78+
mUnit = customMetric.Get("UNIT").MustString()
79+
metricName = customMetric.Get("DISPLAY_NAME").MustString()
80+
description = customMetric.Get("DESCRIPTION").MustString()
81+
}
6982
lm := NewMetrics(
70-
i, dbField, metricName, metricName, metricName, "", "", "", METRICS_TYPE_COUNTER,
71-
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", tableName, "", "", "", "", "",
83+
i, dbField, metricName, metricName, metricName, mUnit, mUnit, mUnit, mType,
84+
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", tableName, description, description, description, "", "",
7285
)
7386
loadMetrics[fmt.Sprintf("%s-%s", metricName, tableName)] = lm
7487
}
@@ -101,8 +114,17 @@ func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache
101114
if fieldType != common.NATIVE_FIELD_TYPE_METRIC {
102115
continue
103116
}
117+
var mUnit string
118+
mType := METRICS_TYPE_COUNTER
119+
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, table, nativeMetric)]
120+
if ok {
121+
mType = customMetric.Get("TYPE").MustInt()
122+
mUnit = customMetric.Get("UNIT").MustString()
123+
displayName = customMetric.Get("DISPLAY_NAME").MustString()
124+
description = customMetric.Get("DESCRIPTION").MustString()
125+
}
104126
lm := NewMetrics(
105-
len(loadMetrics), nativeMetric, displayName, displayName, displayName, "", "", "", METRICS_TYPE_COUNTER,
127+
len(loadMetrics), nativeMetric, displayName, displayName, displayName, mUnit, mUnit, mUnit, mType,
106128
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
107129
)
108130
loadMetrics[fmt.Sprintf("%s-%s", nativeMetric, table)] = lm

server/querier/engine/clickhouse/metrics/metrics.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"slices"
2525
"strings"
2626

27+
simplejson "github.com/bitly/go-simplejson"
2728
"github.com/deepflowio/deepflow/server/querier/common"
2829
"github.com/deepflowio/deepflow/server/querier/config"
2930
ckcommon "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/common"
@@ -116,7 +117,7 @@ func NewReplaceMetrics(dbField string, condition string) *Metrics {
116117
}
117118
}
118119

119-
func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metrics) (*Metrics, bool) {
120+
func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metrics, customMetrics map[string]*simplejson.Json) (*Metrics, bool) {
120121
field = strings.Trim(field, "`")
121122
if field == COUNT_METRICS_NAME {
122123
return &Metrics{
@@ -129,7 +130,7 @@ func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metri
129130
Table: table,
130131
}, true
131132
}
132-
return GetMetrics(field, db, table, orgID, nativeField)
133+
return GetMetrics(field, db, table, orgID, nativeField, customMetrics)
133134
}
134135

135136
func GetTagTypeMetrics(tagDescriptions *common.Result, newAllMetrics map[string]*Metrics, db, table, orgID string) error {
@@ -225,7 +226,7 @@ func GetTagTypeMetrics(tagDescriptions *common.Result, newAllMetrics map[string]
225226
return nil
226227
}
227228

228-
func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics) (*Metrics, bool) {
229+
func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics, customMetrics map[string]*simplejson.Json) (*Metrics, bool) {
229230
newAllMetrics := map[string]*Metrics{}
230231
field = strings.Trim(field, "`")
231232
// flow_tag database has no metrics
@@ -241,17 +242,35 @@ func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics)
241242
)
242243
return metric, true
243244
}
245+
244246
// dynamic metrics
245247
if slices.Contains([]string{ckcommon.DB_NAME_DEEPFLOW_ADMIN, ckcommon.DB_NAME_DEEPFLOW_TENANT, ckcommon.DB_NAME_APPLICATION_LOG, ckcommon.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{ckcommon.TABLE_NAME_L7_FLOW_LOG, ckcommon.TABLE_NAME_EVENT, ckcommon.TABLE_NAME_FILE_EVENT}, table) {
246248
fieldSplit := strings.Split(field, ".")
247249
if len(fieldSplit) > 1 {
248250
if fieldSplit[0] == "metrics" {
249251
fieldName := strings.Replace(field, "metrics.", "", 1)
252+
var mUnit, description string
253+
mType := METRICS_TYPE_COUNTER
254+
if customMetrics == nil {
255+
var fetchErr error
256+
customMetrics, fetchErr = ckcommon.GetCustomMetrics(orgID)
257+
if fetchErr != nil {
258+
log.Error(fetchErr.Error())
259+
customMetrics = map[string]*simplejson.Json{}
260+
}
261+
}
262+
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, table, fieldName)]
263+
if ok {
264+
mType = customMetric.Get("TYPE").MustInt()
265+
mUnit = customMetric.Get("UNIT").MustString()
266+
field = customMetric.Get("DISPLAY_NAME").MustString()
267+
description = customMetric.Get("DESCRIPTION").MustString()
268+
}
250269
metrics_names_field, metrics_values_field := METRICS_ARRAY_NAME_MAP[db][0], METRICS_ARRAY_NAME_MAP[db][1]
251270
metric := NewMetrics(
252271
0, fmt.Sprintf("if(indexOf(%s, '%s')=0,null,%s[indexOf(%s, '%s')])", metrics_names_field, fieldName, metrics_values_field, metrics_names_field, fieldName),
253-
field, field, field, "", "", "", METRICS_TYPE_COUNTER,
254-
ckcommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "",
272+
field, field, field, mUnit, mUnit, mUnit, mType,
273+
ckcommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
255274
)
256275
return metric, true
257276
} else if fieldSplit[0] == "tag" {

server/querier/engine/clickhouse/tag.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func GetPrometheusAllTagTranslator(e *CHEngine) (string, string, error) {
303303
}
304304

305305
func GetMetricsTag(name string, alias string, e *CHEngine) (Statement, error) {
306-
metricStruct, ok := metrics.GetMetrics(strings.Trim(name, "`"), e.DB, e.Table, e.ORGID, e.NativeField)
306+
metricStruct, ok := metrics.GetMetrics(strings.Trim(name, "`"), e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
307307
if !ok {
308308
return nil, nil
309309
}

0 commit comments

Comments
 (0)