Skip to content

Commit ab751ec

Browse files
fix: compact management events and disable 1-second aggregation
1 parent 553ac18 commit ab751ec

25 files changed

+395
-244
lines changed

server/ingester/event/dbwriter/event.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type EventStore struct {
6868
ProcessKName string `json:"process_kname" category:"$tag" sub:"service_info"` // us
6969

7070
GProcessID uint32 `json:"gprocess_id" category:"$tag" sub:"universal_tag"`
71+
RootPID uint32
7172

7273
RegionID uint16 `json:"region_id" category:"$tag" sub:"universal_tag"`
7374
AZID uint16 `json:"az_id" category:"$tag" sub:"universal_tag"`
@@ -295,6 +296,12 @@ func GenEventCKTable(cluster, storagePolicy, table, ckdbType string, ttl int, co
295296
partition = DefaultFileEventPartition
296297
}
297298

299+
aggr1S := true
300+
switch table {
301+
case common.FILE_AGG_EVENT.TableName(), common.FILE_MGMT_EVENT.TableName(), common.PROC_PERM_EVENT.TableName(), common.PROC_OPS_EVENT.TableName():
302+
aggr1S = false
303+
}
304+
298305
return &ckdb.Table{
299306
Version: basecommon.CK_VERSION,
300307
Database: EVENT_DB,
@@ -311,7 +318,7 @@ func GenEventCKTable(cluster, storagePolicy, table, ckdbType string, ttl int, co
311318
ColdStorage: *coldStorage,
312319
OrderKeys: orderKeys,
313320
PrimaryKeyCount: len(orderKeys),
314-
Aggr1S: true,
321+
Aggr1S: aggr1S,
315322
AggrTableSuffix: "_metrics",
316323
AggrCounted: true,
317324
}

server/ingester/event/dbwriter/event_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
eventcommon "github.com/deepflowio/deepflow/server/ingester/event/common"
77
exporterconfig "github.com/deepflowio/deepflow/server/ingester/exporters/config"
8+
"github.com/deepflowio/deepflow/server/libs/ckdb"
89
)
910

1011
func TestEventStoreDataSourceForNewAiAgentTables(t *testing.T) {
@@ -48,3 +49,25 @@ func TestEventStoreDataSourceForNewAiAgentTables(t *testing.T) {
4849
})
4950
}
5051
}
52+
53+
func TestGenEventCKTableDisables1SAggrForNewAiAgentTables(t *testing.T) {
54+
tests := []struct {
55+
table string
56+
want bool
57+
}{
58+
{eventcommon.FILE_EVENT.TableName(), true},
59+
{eventcommon.FILE_AGG_EVENT.TableName(), false},
60+
{eventcommon.FILE_MGMT_EVENT.TableName(), false},
61+
{eventcommon.PROC_PERM_EVENT.TableName(), false},
62+
{eventcommon.PROC_OPS_EVENT.TableName(), false},
63+
}
64+
65+
for _, tt := range tests {
66+
t.Run(tt.table, func(t *testing.T) {
67+
table := GenEventCKTable("test_cluster", "policy", tt.table, "clickhouse", 24, &ckdb.ColdStorage{})
68+
if table.Aggr1S != tt.want {
69+
t.Fatalf("table %s Aggr1S = %v, want %v", tt.table, table.Aggr1S, tt.want)
70+
}
71+
})
72+
}
73+
}

server/ingester/event/dbwriter/file_agg_event.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ func (e *FileAggEventStore) Release() {
5050

5151
func FileAggEventColumns() []*ckdb.Column {
5252
columns := EventColumns(true)
53-
columns = append(columns, ckdb.NewColumn("event_count", ckdb.UInt32).SetAggrSum().SetAggrSum())
53+
columns = append(columns,
54+
ckdb.NewColumn("root_pid", ckdb.UInt32).SetGroupBy(),
55+
ckdb.NewColumn("event_count", ckdb.UInt32).SetAggrSum().SetAggrSum(),
56+
)
5457
return columns
5558
}

server/ingester/event/dbwriter/file_agg_event_column_block.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,22 @@ import (
77

88
type FileAggEventBlock struct {
99
EventBlock
10+
ColRootPID proto.ColUInt32
1011
ColEventCount proto.ColUInt32
1112
}
1213

1314
func (b *FileAggEventBlock) Reset() {
1415
b.EventBlock.Reset()
16+
b.ColRootPID.Reset()
1517
b.ColEventCount.Reset()
1618
}
1719

1820
func (b *FileAggEventBlock) ToInput(input proto.Input) proto.Input {
1921
input = b.EventBlock.ToInput(input)
20-
return append(input, proto.InputColumn{Name: "event_count", Data: &b.ColEventCount})
22+
return append(input,
23+
proto.InputColumn{Name: "root_pid", Data: &b.ColRootPID},
24+
proto.InputColumn{Name: "event_count", Data: &b.ColEventCount},
25+
)
2126
}
2227

2328
func (n *FileAggEventStore) NewColumnBlock() ckdb.CKColumnBlock {
@@ -28,5 +33,6 @@ func (n *FileAggEventStore) NewColumnBlock() ckdb.CKColumnBlock {
2833
func (n *FileAggEventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
2934
block := b.(*FileAggEventBlock)
3035
n.EventStore.AppendToColumnBlock(&block.EventBlock)
36+
block.ColRootPID.Append(n.RootPID)
3137
block.ColEventCount.Append(n.EventCount)
3238
}

server/ingester/event/dbwriter/file_mgmt_event.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func (e *FileMgmtEventStore) Release() {
5353
func FileMgmtEventColumns() []*ckdb.Column {
5454
columns := EventColumns(true)
5555
columns = append(columns,
56+
ckdb.NewColumn("root_pid", ckdb.UInt32).SetGroupBy(),
5657
ckdb.NewColumn("target_uid", ckdb.UInt32).SetGroupBy(),
5758
ckdb.NewColumn("target_gid", ckdb.UInt32).SetGroupBy(),
5859
ckdb.NewColumn("target_mode", ckdb.UInt32).SetGroupBy(),

server/ingester/event/dbwriter/file_mgmt_event_column_block.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ import (
77

88
type FileMgmtEventBlock struct {
99
EventBlock
10+
ColRootPID proto.ColUInt32
1011
ColTargetUID proto.ColUInt32
1112
ColTargetGID proto.ColUInt32
1213
ColTargetMode proto.ColUInt32
1314
}
1415

1516
func (b *FileMgmtEventBlock) Reset() {
1617
b.EventBlock.Reset()
18+
b.ColRootPID.Reset()
1719
b.ColTargetUID.Reset()
1820
b.ColTargetGID.Reset()
1921
b.ColTargetMode.Reset()
@@ -22,6 +24,7 @@ func (b *FileMgmtEventBlock) Reset() {
2224
func (b *FileMgmtEventBlock) ToInput(input proto.Input) proto.Input {
2325
input = b.EventBlock.ToInput(input)
2426
return append(input,
27+
proto.InputColumn{Name: "root_pid", Data: &b.ColRootPID},
2528
proto.InputColumn{Name: "target_uid", Data: &b.ColTargetUID},
2629
proto.InputColumn{Name: "target_gid", Data: &b.ColTargetGID},
2730
proto.InputColumn{Name: "target_mode", Data: &b.ColTargetMode},
@@ -36,6 +39,7 @@ func (n *FileMgmtEventStore) NewColumnBlock() ckdb.CKColumnBlock {
3639
func (n *FileMgmtEventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
3740
block := b.(*FileMgmtEventBlock)
3841
n.EventStore.AppendToColumnBlock(&block.EventBlock)
42+
block.ColRootPID.Append(n.RootPID)
3943
block.ColTargetUID.Append(n.TargetUID)
4044
block.ColTargetGID.Append(n.TargetGID)
4145
block.ColTargetMode.Append(n.TargetMode)

server/ingester/event/dbwriter/proc_ops_event.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ var procOpsEventPool = pool.NewLockFreePool(func() *ProcOpsEventStore {
1919
type ProcOpsEventStore struct {
2020
EventStore
2121

22-
Pid uint32
23-
ParentPid uint32
24-
AiAgentRootPid uint32
25-
UID uint32
26-
GID uint32
27-
Cmdline string
28-
ExecPath string
22+
Pid uint32
23+
ParentPid uint32
24+
UID uint32
25+
GID uint32
26+
Cmdline string
27+
ExecPath string
2928
}
3029

3130
func AcquireProcOpsEventStore() *ProcOpsEventStore {
@@ -57,7 +56,7 @@ func ProcOpsEventColumns() []*ckdb.Column {
5756
columns = append(columns,
5857
ckdb.NewColumn("pid", ckdb.UInt32).SetGroupBy(),
5958
ckdb.NewColumn("parent_pid", ckdb.UInt32).SetGroupBy(),
60-
ckdb.NewColumn("ai_agent_root_pid", ckdb.UInt32).SetGroupBy(),
59+
ckdb.NewColumn("root_pid", ckdb.UInt32).SetGroupBy(),
6160
ckdb.NewColumn("uid", ckdb.UInt32).SetGroupBy(),
6261
ckdb.NewColumn("gid", ckdb.UInt32).SetGroupBy(),
6362
ckdb.NewColumn("cmdline", ckdb.String).SetIgnoredInAggrTable(),

server/ingester/event/dbwriter/proc_ops_event_column_block.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ type ProcOpsEventBlock struct {
99
EventBlock
1010
ColPid proto.ColUInt32
1111
ColParentPid proto.ColUInt32
12-
ColAiAgentRootPid proto.ColUInt32
12+
ColRootPID proto.ColUInt32
1313
ColUID proto.ColUInt32
1414
ColGID proto.ColUInt32
1515
ColCmdline proto.ColStr
@@ -22,7 +22,7 @@ func (b *ProcOpsEventBlock) Reset() {
2222
b.EventBlock.Reset()
2323
b.ColPid.Reset()
2424
b.ColParentPid.Reset()
25-
b.ColAiAgentRootPid.Reset()
25+
b.ColRootPID.Reset()
2626
b.ColUID.Reset()
2727
b.ColGID.Reset()
2828
b.ColCmdline.Reset()
@@ -36,7 +36,7 @@ func (b *ProcOpsEventBlock) ToInput(input proto.Input) proto.Input {
3636
return append(input,
3737
proto.InputColumn{Name: "pid", Data: &b.ColPid},
3838
proto.InputColumn{Name: "parent_pid", Data: &b.ColParentPid},
39-
proto.InputColumn{Name: "ai_agent_root_pid", Data: &b.ColAiAgentRootPid},
39+
proto.InputColumn{Name: "root_pid", Data: &b.ColRootPID},
4040
proto.InputColumn{Name: "uid", Data: &b.ColUID},
4141
proto.InputColumn{Name: "gid", Data: &b.ColGID},
4242
proto.InputColumn{Name: "cmdline", Data: &b.ColCmdline},
@@ -57,7 +57,7 @@ func (n *ProcOpsEventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
5757
n.EventStore.AppendToColumnBlock(&block.EventBlock)
5858
block.ColPid.Append(n.Pid)
5959
block.ColParentPid.Append(n.ParentPid)
60-
block.ColAiAgentRootPid.Append(n.AiAgentRootPid)
60+
block.ColRootPID.Append(n.RootPID)
6161
block.ColUID.Append(n.UID)
6262
block.ColGID.Append(n.GID)
6363
block.ColCmdline.Append(n.Cmdline)

server/ingester/event/dbwriter/proc_perm_event.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ var procPermEventPool = pool.NewLockFreePool(func() *ProcPermEventStore {
1919
type ProcPermEventStore struct {
2020
EventStore
2121

22-
Pid uint32
23-
AiAgentRootPid uint32
24-
OldUID uint32
25-
OldGID uint32
26-
NewUID uint32
27-
NewGID uint32
22+
Pid uint32
23+
OldUID uint32
24+
OldGID uint32
25+
NewUID uint32
26+
NewGID uint32
2827
}
2928

3029
func AcquireProcPermEventStore() *ProcPermEventStore {
@@ -55,7 +54,7 @@ func ProcPermEventColumns() []*ckdb.Column {
5554
columns := EventColumns(false)
5655
columns = append(columns,
5756
ckdb.NewColumn("pid", ckdb.UInt32).SetGroupBy(),
58-
ckdb.NewColumn("ai_agent_root_pid", ckdb.UInt32).SetGroupBy(),
57+
ckdb.NewColumn("root_pid", ckdb.UInt32).SetGroupBy(),
5958
ckdb.NewColumn("old_uid", ckdb.UInt32).SetGroupBy(),
6059
ckdb.NewColumn("old_gid", ckdb.UInt32).SetGroupBy(),
6160
ckdb.NewColumn("new_uid", ckdb.UInt32).SetGroupBy(),

server/ingester/event/dbwriter/proc_perm_event_column_block.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
type ProcPermEventBlock struct {
99
EventBlock
1010
ColPid proto.ColUInt32
11-
ColAiAgentRootPid proto.ColUInt32
11+
ColRootPID proto.ColUInt32
1212
ColOldUID proto.ColUInt32
1313
ColOldGID proto.ColUInt32
1414
ColNewUID proto.ColUInt32
@@ -20,7 +20,7 @@ type ProcPermEventBlock struct {
2020
func (b *ProcPermEventBlock) Reset() {
2121
b.EventBlock.Reset()
2222
b.ColPid.Reset()
23-
b.ColAiAgentRootPid.Reset()
23+
b.ColRootPID.Reset()
2424
b.ColOldUID.Reset()
2525
b.ColOldGID.Reset()
2626
b.ColNewUID.Reset()
@@ -33,7 +33,7 @@ func (b *ProcPermEventBlock) ToInput(input proto.Input) proto.Input {
3333
input = b.EventBlock.ToInput(input)
3434
return append(input,
3535
proto.InputColumn{Name: "pid", Data: &b.ColPid},
36-
proto.InputColumn{Name: "ai_agent_root_pid", Data: &b.ColAiAgentRootPid},
36+
proto.InputColumn{Name: "root_pid", Data: &b.ColRootPID},
3737
proto.InputColumn{Name: "old_uid", Data: &b.ColOldUID},
3838
proto.InputColumn{Name: "old_gid", Data: &b.ColOldGID},
3939
proto.InputColumn{Name: "new_uid", Data: &b.ColNewUID},
@@ -53,7 +53,7 @@ func (n *ProcPermEventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) {
5353
block := b.(*ProcPermEventBlock)
5454
n.EventStore.AppendToColumnBlock(&block.EventBlock)
5555
block.ColPid.Append(n.Pid)
56-
block.ColAiAgentRootPid.Append(n.AiAgentRootPid)
56+
block.ColRootPID.Append(n.RootPID)
5757
block.ColOldUID.Append(n.OldUID)
5858
block.ColOldGID.Append(n.OldGID)
5959
block.ColNewUID.Append(n.NewUID)

0 commit comments

Comments
 (0)