Skip to content

Commit 7f5bc80

Browse files
feat: extend l7_protocol enum
1 parent 207b04b commit 7f5bc80

9 files changed

Lines changed: 221 additions & 3 deletions

File tree

server/ingester/flow_log/log_data/l7_flow_log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func L7FlowLogColumns() []*ckdb.Column {
222222
l7Columns = append(l7Columns, ckdb.NewColumn("_id", ckdb.UInt64))
223223
l7Columns = append(l7Columns, L7BaseColumns()...)
224224
l7Columns = append(l7Columns,
225-
ckdb.NewColumn("l7_protocol", ckdb.UInt8).SetIndex(ckdb.IndexNone).SetComment("0:未知 1:其他, 20:http1, 21:http2, 40:dubbo, 60:mysql, 80:redis, 100:kafka, 101:mqtt, 120:dns"),
225+
ckdb.NewColumn("l7_protocol", ckdb.UInt8).SetIndex(ckdb.IndexNone).SetComment("0:未知, 20:http1, 21:http2, 40:dubbo, 60:mysql, 61:postgresql, 62:oracle, 63:dameng, 64:db2, 65:tdsql, 66:oceanbase, 67:goldendb, 68:kingbase, 80:redis, 100:kafka, 101:mqtt, 120:dns"),
226226
ckdb.NewColumn("biz_protocol", ckdb.LowCardinalityString).SetIndex(ckdb.IndexNone).SetComment("应用协议"),
227227
ckdb.NewColumn("version", ckdb.LowCardinalityString).SetComment("协议版本"),
228228
ckdb.NewColumn("type", ckdb.UInt8).SetIndex(ckdb.IndexNone).SetComment("日志类型, 0:请求, 1:响应, 2:会话"),

server/libs/datatype/flow.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ const (
165165
L7_PROTOCOL_POSTGRE L7Protocol = 61
166166
L7_PROTOCOL_ORACLE L7Protocol = 62
167167
L7_PROTOCOL_DAMENG L7Protocol = 63
168+
L7_PROTOCOL_DB2 L7Protocol = 64
169+
L7_PROTOCOL_TDSQL L7Protocol = 65
170+
L7_PROTOCOL_OCEANBASE L7Protocol = 66
171+
L7_PROTOCOL_GOLDENDB L7Protocol = 67
172+
L7_PROTOCOL_KINGBASE L7Protocol = 68
168173
L7_PROTOCOL_REDIS L7Protocol = 80
169174
L7_PROTOCOL_MONGODB L7Protocol = 81
170175
L7_PROTOCOL_MEMCACHED L7Protocol = 82
@@ -700,6 +705,36 @@ func (p L7Protocol) String(isTLS bool) string {
700705
} else {
701706
return "Dameng"
702707
}
708+
case L7_PROTOCOL_DB2:
709+
if isTLS {
710+
return "DB2_TLS"
711+
} else {
712+
return "DB2"
713+
}
714+
case L7_PROTOCOL_TDSQL:
715+
if isTLS {
716+
return "TDSQL_TLS"
717+
} else {
718+
return "TDSQL"
719+
}
720+
case L7_PROTOCOL_OCEANBASE:
721+
if isTLS {
722+
return "OceanBase_TLS"
723+
} else {
724+
return "OceanBase"
725+
}
726+
case L7_PROTOCOL_GOLDENDB:
727+
if isTLS {
728+
return "GoldenDB_TLS"
729+
} else {
730+
return "GoldenDB"
731+
}
732+
case L7_PROTOCOL_KINGBASE:
733+
if isTLS {
734+
return "Kingbase_TLS"
735+
} else {
736+
return "Kingbase"
737+
}
703738
case L7_PROTOCOL_ISO8583:
704739
if isTLS {
705740
return "ISO-8583_TLS"
@@ -823,6 +858,11 @@ var L7ProtocolStringMap = map[string]L7Protocol{
823858
strings.ToLower(L7_PROTOCOL_POSTGRE.String(false)): L7_PROTOCOL_POSTGRE,
824859
strings.ToLower(L7_PROTOCOL_ORACLE.String(false)): L7_PROTOCOL_ORACLE,
825860
strings.ToLower(L7_PROTOCOL_DAMENG.String(false)): L7_PROTOCOL_DAMENG,
861+
strings.ToLower(L7_PROTOCOL_DB2.String(false)): L7_PROTOCOL_DB2,
862+
strings.ToLower(L7_PROTOCOL_TDSQL.String(false)): L7_PROTOCOL_TDSQL,
863+
strings.ToLower(L7_PROTOCOL_OCEANBASE.String(false)): L7_PROTOCOL_OCEANBASE,
864+
strings.ToLower(L7_PROTOCOL_GOLDENDB.String(false)): L7_PROTOCOL_GOLDENDB,
865+
strings.ToLower(L7_PROTOCOL_KINGBASE.String(false)): L7_PROTOCOL_KINGBASE,
826866
strings.ToLower(L7_PROTOCOL_ISO8583.String(false)): L7_PROTOCOL_ISO8583,
827867
strings.ToLower(L7_PROTOCOL_NET_SIGN.String(false)): L7_PROTOCOL_NET_SIGN,
828868
strings.ToLower(L7_PROTOCOL_TRIPLE.String(false)): L7_PROTOCOL_TRIPLE,

server/libs/datatype/tag_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,25 @@ func TestTagEncodeAndDecode(t *testing.T) {
4545
t.Errorf("编解码函数实现错误")
4646
}
4747
}
48+
49+
func TestCommercialDatabaseL7ProtocolMappings(t *testing.T) {
50+
expected := map[string]L7Protocol{
51+
"mysql": L7_PROTOCOL_MYSQL,
52+
"postgresql": L7_PROTOCOL_POSTGRE,
53+
"oracle": L7_PROTOCOL_ORACLE,
54+
"dameng": L7_PROTOCOL_DAMENG,
55+
"db2": L7_PROTOCOL_DB2,
56+
"tdsql": L7_PROTOCOL_TDSQL,
57+
"oceanbase": L7_PROTOCOL_OCEANBASE,
58+
"goldendb": L7_PROTOCOL_GOLDENDB,
59+
"kingbase": L7_PROTOCOL_KINGBASE,
60+
}
61+
for name, protocol := range expected {
62+
if got := L7ProtocolStringMap[name]; got != protocol {
63+
t.Errorf("L7ProtocolStringMap[%q] = %v, want %v", name, got, protocol)
64+
}
65+
if got := protocol.String(false); got == "N/A" {
66+
t.Errorf("L7Protocol(%d).String(false) = %q", protocol, got)
67+
}
68+
}
69+
}

server/libs/flow-metrics/tag.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ func GenTagColumns(code Code) []*ckdb.Column {
10031003
columns = append(columns, ckdb.NewColumnWithGroupBy("l3_epc_id_1", ckdb.Int32).SetComment("ip4/6_1对应的EPC ID"))
10041004
}
10051005
if code&L7Protocol != 0 {
1006-
columns = append(columns, ckdb.NewColumnWithGroupBy("l7_protocol", ckdb.UInt8).SetComment("应用协议0: unknown, 1: http, 2: dns, 3: mysql, 4: redis, 5: dubbo, 6: kafka"))
1006+
columns = append(columns, ckdb.NewColumnWithGroupBy("l7_protocol", ckdb.UInt8).SetComment("应用协议0: unknown, 20: http1, 21: http2, 40: dubbo, 60: mysql, 61: postgresql, 62: oracle, 63: dameng, 64: db2, 65: tdsql, 66: oceanbase, 67: goldendb, 68: kingbase, 80: redis, 100: kafka"))
10071007
columns = append(columns, ckdb.NewColumnWithGroupBy("app_service", ckdb.LowCardinalityString))
10081008
columns = append(columns, ckdb.NewColumnWithGroupBy("app_instance", ckdb.LowCardinalityString))
10091009
columns = append(columns, ckdb.NewColumnWithGroupBy("endpoint", ckdb.String))

server/querier/db_descriptions/clickhouse/tag/enum/l7_protocol

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
61 , PostgreSQL ,
1717
62 , Oracle ,
1818
63 , Dameng ,
19+
64 , DB2 ,
20+
65 , TDSQL ,
21+
66 , OceanBase ,
22+
67 , GoldenDB ,
23+
68 , Kingbase ,
1924
80 , Redis ,
2025
81 , MongoDB ,
2126
82 , Memcached ,
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Value , DisplayName , Description
2+
0 , N/A ,
3+
20 , HTTP ,
4+
21 , HTTP2 ,
5+
40 , Dubbo ,
6+
41 , gRPC ,
7+
43 , SofaRPC ,
8+
44 , FastCGI ,
9+
45 , bRPC ,
10+
46 , Tars ,
11+
47 , Some/IP ,
12+
48 , ISO-8583 ,
13+
49 , Triple ,
14+
50 , NetSign ,
15+
60 , MySQL ,
16+
61 , PostgreSQL ,
17+
62 , Oracle ,
18+
63 , 达梦 ,
19+
64 , DB2 ,
20+
65 , TDSQL ,
21+
66 , OceanBase ,
22+
67 , GoldenDB ,
23+
68 , 人大金仓 ,
24+
80 , Redis ,
25+
81 , MongoDB ,
26+
82 , Memcached ,
27+
100 , Kafka ,
28+
101 , MQTT ,
29+
102 , AMQP , RabbitMQ
30+
103 , OpenWire , ActiveMQ
31+
104 , NATS ,
32+
105 , Pulsar ,
33+
106 , ZMTP , ZeroMQ
34+
107 , RocketMQ ,
35+
108 , WebSphereMQ ,
36+
120 , DNS ,
37+
121 , TLS ,
38+
122 , Ping ,
39+
127 , Custom ,
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Value , DisplayName , Description
2+
0 , N/A ,
3+
20 , HTTP ,
4+
21 , HTTP2 ,
5+
40 , Dubbo ,
6+
41 , gRPC ,
7+
43 , SofaRPC ,
8+
44 , FastCGI ,
9+
45 , bRPC ,
10+
46 , Tars ,
11+
47 , Some/IP ,
12+
48 , ISO-8583 ,
13+
49 , Triple ,
14+
50 , NetSign ,
15+
60 , MySQL ,
16+
61 , PostgreSQL ,
17+
62 , Oracle ,
18+
63 , Dameng ,
19+
64 , DB2 ,
20+
65 , TDSQL ,
21+
66 , OceanBase ,
22+
67 , GoldenDB ,
23+
68 , Kingbase ,
24+
80 , Redis ,
25+
81 , MongoDB ,
26+
82 , Memcached ,
27+
100 , Kafka ,
28+
101 , MQTT ,
29+
102 , AMQP , RabbitMQ
30+
103 , OpenWire , ActiveMQ
31+
104 , NATS ,
32+
105 , Pulsar ,
33+
106 , ZMTP , ZeroMQ
34+
107 , RocketMQ ,
35+
108 , WebSphereMQ ,
36+
120 , DNS ,
37+
121 , TLS ,
38+
122 , Ping ,
39+
127 , Custom ,

server/querier/engine/clickhouse/clickhouse_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/deepflowio/deepflow/server/querier/config"
3636
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
3737
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/metrics"
38+
tagdescription "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/tag"
3839
"github.com/deepflowio/deepflow/server/querier/parse"
3940
)
4041

@@ -692,6 +693,54 @@ func TestGetSql(t *testing.T) {
692693
}
693694
}
694695

696+
func TestL7ProtocolDatabaseEnumCandidates(t *testing.T) {
697+
if err := Load(); err != nil {
698+
t.Fatal(err)
699+
}
700+
701+
enums, ok := tagdescription.TAG_INT_ENUMS["l7_protocol"]
702+
if !ok {
703+
t.Fatal("missing l7_protocol int enum")
704+
}
705+
got := make(map[int]struct {
706+
en string
707+
zh string
708+
}, len(enums))
709+
for _, item := range enums {
710+
value, ok := item.Value.(int)
711+
if !ok {
712+
t.Fatalf("unexpected l7_protocol enum value type %T", item.Value)
713+
}
714+
got[value] = struct {
715+
en string
716+
zh string
717+
}{
718+
en: fmt.Sprint(item.DisplayNameEN),
719+
zh: fmt.Sprint(item.DisplayNameZH),
720+
}
721+
}
722+
723+
expected := map[int]struct {
724+
en string
725+
zh string
726+
}{
727+
60: {en: "MySQL", zh: "MySQL"},
728+
61: {en: "PostgreSQL", zh: "PostgreSQL"},
729+
62: {en: "Oracle", zh: "Oracle"},
730+
63: {en: "Dameng", zh: "达梦"},
731+
64: {en: "DB2", zh: "DB2"},
732+
65: {en: "TDSQL", zh: "TDSQL"},
733+
66: {en: "OceanBase", zh: "OceanBase"},
734+
67: {en: "GoldenDB", zh: "GoldenDB"},
735+
68: {en: "Kingbase", zh: "人大金仓"},
736+
}
737+
for value, want := range expected {
738+
if got[value] != want {
739+
t.Errorf("l7_protocol enum %d = %+v, want %+v", value, got[value], want)
740+
}
741+
}
742+
}
743+
695744
/* func TestGetSqltest(t *testing.T) {
696745
for _, pcase := range parsetest {
697746
e := CHEngine{DB: "flow_log"}

server/querier/engine/clickhouse/tag/description.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"regexp"
2424
"slices"
25+
"sort"
2526
"strconv"
2627
"strings"
2728

@@ -156,6 +157,16 @@ func NewTagEnum(value, displayNameZH, displayNameEN, descriptionZH, descriptionE
156157
}
157158
}
158159

160+
func enumFileBaseAndPriority(file string) (string, int) {
161+
if strings.HasSuffix(file, ".ch") {
162+
return strings.TrimSuffix(file, ".ch"), 1
163+
}
164+
if strings.HasSuffix(file, ".en") {
165+
return strings.TrimSuffix(file, ".en"), 2
166+
}
167+
return file, 0
168+
}
169+
159170
func LoadTagDescriptions(tagData map[string]interface{}) error {
160171
// 生成tag description
161172
enumFileToTagType := make(map[string]string)
@@ -241,7 +252,20 @@ func LoadTagDescriptions(tagData map[string]interface{}) error {
241252
tagEnumData, ok := tagData["enum"]
242253
if ok {
243254
tagMap := map[string][][6]interface{}{}
244-
for tagEnumFile, enumData := range tagEnumData.(map[string]interface{}) {
255+
tagEnumFiles := make([]string, 0, len(tagEnumData.(map[string]interface{})))
256+
for tagEnumFile := range tagEnumData.(map[string]interface{}) {
257+
tagEnumFiles = append(tagEnumFiles, tagEnumFile)
258+
}
259+
sort.SliceStable(tagEnumFiles, func(i, j int) bool {
260+
baseI, priorityI := enumFileBaseAndPriority(tagEnumFiles[i])
261+
baseJ, priorityJ := enumFileBaseAndPriority(tagEnumFiles[j])
262+
if baseI == baseJ {
263+
return priorityI < priorityJ
264+
}
265+
return baseI < baseJ
266+
})
267+
for _, tagEnumFile := range tagEnumFiles {
268+
enumData := tagEnumData.(map[string]interface{})[tagEnumFile]
245269
tagName := strings.TrimSuffix(tagEnumFile, ".ch")
246270
tagName = strings.TrimSuffix(tagName, ".en")
247271
values, ok := tagMap[tagName]

0 commit comments

Comments
 (0)