Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/querier/app/prometheus/service/label_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func getMetrics(ctx context.Context, args *model.PromMetaParams) (resp []string)
}
} else {
for _, table := range tables {
tableMetrics, _, _ := metrics.FormatMetricsToResult(db, table, where, "", args.OrgID, false, args.Context)
tableMetrics, _, _ := metrics.FormatMetricsToResult(db, table, where, "", args.OrgID, false, args.Context, nil)
for field, v := range tableMetrics {
if v.Category == METRICS_CATEGORY_TAG {
continue
Expand Down
38 changes: 33 additions & 5 deletions server/querier/engine/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

//"github.com/k0kubun/pp"
"github.com/bitly/go-simplejson"
logging "github.com/op/go-logging"
"github.com/xwb1989/sqlparser"

Expand Down Expand Up @@ -112,6 +113,7 @@ type CHEngine struct {
ORGID string
Language string
NativeField map[string]*metrics.Metrics
CustomMetrics map[string]*simplejson.Json
}

func init() {
Expand Down Expand Up @@ -547,7 +549,15 @@ func (e *CHEngine) ParseShowSql(sql string, args *common.QuerierParams, DebugInf
where = visibilityWhere
sql = visibilitySql
}
result, err := metrics.GetMetricsDescriptions(e.DB, table, where, args.QueryCacheTTL, args.ORGID, args.UseQueryCache, e.Context)
if e.CustomMetrics == nil && config.ControllerCfg.DFWebService.Enabled {
var fetchErr error
e.CustomMetrics, fetchErr = chCommon.GetCustomMetrics(e.ORGID)
if fetchErr != nil {
log.Error(fetchErr.Error())
e.CustomMetrics = map[string]*simplejson.Json{}
}
}
result, err := metrics.GetMetricsDescriptions(e.DB, table, where, args.QueryCacheTTL, args.ORGID, args.UseQueryCache, e.Context, e.CustomMetrics)
if err != nil {
return nil, []string{}, true, err
}
Expand Down Expand Up @@ -1329,6 +1339,15 @@ func (e *CHEngine) TransFrom(froms sqlparser.TableExprs) error {
e.Table = table
// native field
if config.ControllerCfg.DFWebService.Enabled && (slices.Contains([]string{chCommon.DB_NAME_DEEPFLOW_ADMIN, chCommon.DB_NAME_DEEPFLOW_TENANT, chCommon.DB_NAME_APPLICATION_LOG, chCommon.DB_NAME_EXT_METRICS}, e.DB) || slices.Contains([]string{chCommon.TABLE_NAME_L7_FLOW_LOG, chCommon.TABLE_NAME_EVENT, chCommon.TABLE_NAME_FILE_EVENT}, e.Table)) {
// get custom-metrics
var err error
if e.CustomMetrics == nil {
e.CustomMetrics, err = chCommon.GetCustomMetrics(e.ORGID)
if err != nil {
log.Error(err.Error())
}
}
customMetrics := e.CustomMetrics
e.NativeField = map[string]*metrics.Metrics{}
getNativeUrl := fmt.Sprintf("http://localhost:%d/v1/native-fields/?db=%s&table_name=%s", config.ControllerCfg.ListenPort, e.DB, e.Table)
resp, err := ctlcommon.CURLPerform("GET", getNativeUrl, nil, ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, e.ORGID))
Expand All @@ -1346,9 +1365,18 @@ func (e *CHEngine) TransFrom(froms sqlparser.TableExprs) error {
continue
}
if fieldType == chCommon.NATIVE_FIELD_TYPE_METRIC {
var mUnit string
mType := metrics.METRICS_TYPE_COUNTER
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", e.DB, e.Table, nativeMetric)]
if ok {
mType = customMetric.Get("TYPE").MustInt()
mUnit = customMetric.Get("UNIT").MustString()
displayName = customMetric.Get("DISPLAY_NAME").MustString()
description = customMetric.Get("DESCRIPTION").MustString()
}
metric := metrics.NewMetrics(
0, nativeMetric,
displayName, displayName, displayName, "", "", "", metrics.METRICS_TYPE_COUNTER,
displayName, displayName, displayName, mUnit, mUnit, mUnit, mType,
chCommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
)
e.NativeField[nativeMetric] = metric
Expand Down Expand Up @@ -1876,7 +1904,7 @@ func (e *CHEngine) parseSelectBinaryExpr(node sqlparser.Expr) (binary Function,
if fieldFunc != nil {
return fieldFunc, nil
}
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if ok {
return &Field{Value: metricStruct.DBField}, nil
}
Expand Down Expand Up @@ -1989,7 +2017,7 @@ func (e *CHEngine) parseWhere(node sqlparser.Expr, w *Where, isCheck bool) (view
switch comparExpr.(type) {
case *sqlparser.ColName, *sqlparser.SQLVal:
whereTag := chCommon.ParseAlias(node.Left)
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if ok && metricStruct.Type != metrics.METRICS_TYPE_TAG {
whereTag = metricStruct.DBField
}
Expand Down Expand Up @@ -2017,7 +2045,7 @@ func (e *CHEngine) parseWhere(node sqlparser.Expr, w *Where, isCheck bool) (view
return nil, errors.New(fmt.Sprintf("parse where error: %s(%T)", sqlparser.String(node), node))
}
whereTag := chCommon.ParseAlias(node.Expr)
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if ok && metricStruct.Type != metrics.METRICS_TYPE_TAG {
whereTag = metricStruct.DBField
}
Expand Down
4 changes: 4 additions & 0 deletions server/querier/engine/clickhouse/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ const (
NATIVE_FIELD_STATE_NORMAL = 1
)

const (
API_CUSTOM_METRICS_FORMAT = "http://localhost:%d/v1/custom-metrics"
)

var DB_TABLE_MAP = map[string][]string{
DB_NAME_FLOW_LOG: []string{"l4_flow_log", "l7_flow_log", "l4_packet", "l7_packet"},
DB_NAME_FLOW_METRICS: []string{"network", "network_map", "application", "application_map", "traffic_policy"},
Expand Down
27 changes: 26 additions & 1 deletion server/querier/engine/clickhouse/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ import (
"strconv"
"strings"

"github.com/bitly/go-simplejson"
"github.com/xwb1989/sqlparser"

ctlcommon "github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/querier/config"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
logging "github.com/op/go-logging"
"github.com/xwb1989/sqlparser"
)

var log = logging.MustGetLogger("common")
Expand Down Expand Up @@ -252,3 +255,25 @@ func GetExtTables(db, where, queryCacheTTL, orgID string, useQueryCache bool, ct
}
return values
}

func GetCustomMetrics(orgID string) (map[string]*simplejson.Json, error) {
result := make(map[string]*simplejson.Json)
url := fmt.Sprintf(API_CUSTOM_METRICS_FORMAT, config.ControllerCfg.ListenPort)
resp, err := ctlcommon.CURLPerform("GET", url, nil,
ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, orgID),
ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_USER_ID, strconv.Itoa(ctlcommon.USER_ID_SUPER_ADMIN)),
ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_USER_TYPE, strconv.Itoa(ctlcommon.USER_TYPE_SUPER_ADMIN)),
)
if err != nil {
return result, fmt.Errorf("request controller url (%s) failed: %s", url, err.Error())
}
resultArray := resp.Get("DATA").MustArray()
for i := range resultArray {
item := resp.Get("DATA").GetIndex(i)
db := item.Get("DB").MustString()
table := item.Get("TABLE").MustString()
name := item.Get("NAME").MustString()
result[fmt.Sprintf("%s.%s.%s", db, table, name)] = item
}
return result, nil
}
2 changes: 1 addition & 1 deletion server/querier/engine/clickhouse/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func (t *WhereTag) Trans(expr sqlparser.Expr, w *Where, e *CHEngine) (view.Node,
}
} else {
switch strings.Trim(t.Tag, "`") {
case "policy_type", "metric_value", "event_level", "team_id", "user_id", "target_tags", "_query_region", "_target_uid", "1", "_id", "event_id", "alert_time", "duration", "state":
case "policy_type", "metric_value", "event_level", "team_id", "user_id", "target_tags", "_query_region", "_target_uid", "1", "_id", "event_id", "alert_time", "duration", "state", "start_time", "end_time":
if strings.Contains(op, "match") {
filter = fmt.Sprintf("%s(%s,%s)", op, t.Tag, t.Value)
} else {
Expand Down
6 changes: 3 additions & 3 deletions server/querier/engine/clickhouse/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func GetAggFunc(name string, args []string, alias string, derivativeArgs []strin
if !ok {
return nil, 0, "", nil
}
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if !ok {
return nil, 0, "", nil
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func GetTopKTrans(name string, args []string, alias string, e *CHEngine) (Statem
field = field[5 : len(field)-1]
isEnum = true
}
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
return nil, 0, "", nil
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func GetUniqTrans(name string, args []string, alias string, e *CHEngine) (Statem
var metricStruct *metrics.Metrics
for _, field := range fields {
field = strings.Trim(field, "`")
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField, e.CustomMetrics)
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
return nil, 0, "", nil
}
Expand Down
49 changes: 37 additions & 12 deletions server/querier/engine/clickhouse/metrics/ext_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"slices"
"strings"

simplejson "github.com/bitly/go-simplejson"
ctlcommon "github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/querier/config"
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
Expand All @@ -32,7 +33,7 @@ var EXT_METRICS = map[string]*Metrics{}

func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache bool, ctx context.Context) (map[string]*Metrics, error) {
loadMetrics := make(map[string]*Metrics)
if slices.Contains([]string{common.DB_NAME_DEEPFLOW_ADMIN, common.DB_NAME_DEEPFLOW_TENANT, common.DB_NAME_APPLICATION_LOG, common.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{common.TABLE_NAME_L7_FLOW_LOG, common.TABLE_NAME_EVENT, common.TABLE_NAME_FILE_EVENT}, table) {
if slices.Contains([]string{common.DB_NAME_DEEPFLOW_ADMIN, common.DB_NAME_DEEPFLOW_TENANT, common.DB_NAME_APPLICATION_LOG, common.DB_NAME_EXT_METRICS}, db) || slices.Contains([]string{common.TABLE_NAME_L7_FLOW_LOG}, table) {
externalChClient := client.Client{
Host: config.Cfg.Clickhouse.Host,
Port: config.Cfg.Clickhouse.Port,
Expand All @@ -59,27 +60,42 @@ func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache
log.Error(err)
return nil, err
}
var customMetrics map[string]*simplejson.Json
if config.ControllerCfg.DFWebService.Enabled {
customMetrics, err = common.GetCustomMetrics(orgID)
if err != nil {
log.Error(err.Error())
}
}
for i, value := range externalMetricFloatRst.Values {
tagName := value.([]interface{})[0]
tableName := value.([]interface{})[1].(string)
externalTag := tagName.(string)
metrics_names_field, metrics_values_field := METRICS_ARRAY_NAME_MAP[db][0], METRICS_ARRAY_NAME_MAP[db][1]
dbField := fmt.Sprintf("if(indexOf(%s, '%s')=0, null, %s[indexOf(%s, '%s')])", metrics_names_field, externalTag, metrics_values_field, metrics_names_field, externalTag)
var mUnit, description string
mType := METRICS_TYPE_COUNTER
metricName := fmt.Sprintf("metrics.%s", externalTag)
displayName := metricName
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, tableName, metricName)]
if ok {
mType = customMetric.Get("TYPE").MustInt()
mUnit = customMetric.Get("UNIT").MustString()
displayName = customMetric.Get("DISPLAY_NAME").MustString()
description = customMetric.Get("DESCRIPTION").MustString()
}
lm := NewMetrics(
i, dbField, metricName, metricName, metricName, "", "", "", METRICS_TYPE_COUNTER,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", tableName, "", "", "", "", "",
i, dbField, displayName, displayName, displayName, mUnit, mUnit, mUnit, mType,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", tableName, description, description, description, "", "",
)
loadMetrics[fmt.Sprintf("%s-%s", metricName, tableName)] = lm
}
if !slices.Contains([]string{common.TABLE_NAME_EVENT, common.TABLE_NAME_FILE_EVENT}, table) {
lm := NewMetrics(
len(loadMetrics), "metrics",
"metrics", "metrics", "metrics", "", "", "", METRICS_TYPE_ARRAY,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "",
)
loadMetrics[fmt.Sprintf("%s-%s", "metrics", table)] = lm
}
lm := NewMetrics(
len(loadMetrics), "metrics",
"metrics", "metrics", "metrics", "", "", "", METRICS_TYPE_ARRAY,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "",
)
loadMetrics[fmt.Sprintf("%s-%s", "metrics", table)] = lm

// native metrics
if config.ControllerCfg.DFWebService.Enabled {
Expand All @@ -101,8 +117,17 @@ func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache
if fieldType != common.NATIVE_FIELD_TYPE_METRIC {
continue
}
var mUnit string
mType := METRICS_TYPE_COUNTER
customMetric, ok := customMetrics[fmt.Sprintf("%s.%s.%s", db, table, nativeMetric)]
if ok {
mType = customMetric.Get("TYPE").MustInt()
mUnit = customMetric.Get("UNIT").MustString()
displayName = customMetric.Get("DISPLAY_NAME").MustString()
description = customMetric.Get("DESCRIPTION").MustString()
}
lm := NewMetrics(
len(loadMetrics), nativeMetric, displayName, displayName, displayName, "", "", "", METRICS_TYPE_COUNTER,
len(loadMetrics), nativeMetric, displayName, displayName, displayName, mUnit, mUnit, mUnit, mType,
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
)
loadMetrics[fmt.Sprintf("%s-%s", nativeMetric, table)] = lm
Expand Down
Loading
Loading