Skip to content

Commit 1b57af5

Browse files
feat: link AI event data source retention updates
1 parent af35e3b commit 1b57af5

File tree

3 files changed

+400
-23
lines changed

3 files changed

+400
-23
lines changed

server/controller/db/metadb/migrator/schema/schema_regression_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,27 @@ func TestDBVersionExpectedMatchesHighestMySQLIssue(t *testing.T) {
3434
}
3535
}
3636

37+
func TestMySQLDMLInsert_AIAgentEventDataSourcesShareDefaultRetention(t *testing.T) {
38+
sqlPath := filepath.Join("rawsql", "mysql", "dml_insert.sql")
39+
content, err := os.ReadFile(sqlPath)
40+
if err != nil {
41+
t.Fatalf("read dml_insert sql failed: %v", err)
42+
}
43+
sql := string(content)
44+
45+
required := []string{
46+
"VALUES (27, '事件-文件读写聚合事件', 'event.file_agg_event', 0, 7*24, @lcuuid);",
47+
"VALUES (28, '事件-文件管理事件', 'event.file_mgmt_event', 0, 7*24, @lcuuid);",
48+
"VALUES (29, '事件-进程权限事件', 'event.proc_perm_event', 0, 7*24, @lcuuid);",
49+
"VALUES (30, '事件-进程操作事件', 'event.proc_ops_event', 0, 7*24, @lcuuid);",
50+
}
51+
for _, item := range required {
52+
if !strings.Contains(sql, item) {
53+
t.Fatalf("missing AI event data source default retention entry: %s", item)
54+
}
55+
}
56+
}
57+
3758
func versionGreater(left, right string) bool {
3859
leftParts := strings.Split(left, ".")
3960
rightParts := strings.Split(right, ".")

server/controller/http/service/data_source.go

Lines changed: 81 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,37 @@ var DEFAULT_DATA_SOURCE_DISPLAY_NAMES = []string{
108108
"日志-日志数据", // application_log.log
109109
}
110110

111+
var aiAgentRetentionCollections = []string{
112+
"event.file_agg_event",
113+
"event.file_mgmt_event",
114+
"event.proc_perm_event",
115+
"event.proc_ops_event",
116+
}
117+
118+
func linkedRetentionCollections(collection string) []string {
119+
for _, item := range aiAgentRetentionCollections {
120+
if item == collection {
121+
return append([]string(nil), aiAgentRetentionCollections...)
122+
}
123+
}
124+
return nil
125+
}
126+
127+
func resolveRetentionTargets(current metadbmodel.DataSource, all []metadbmodel.DataSource) []metadbmodel.DataSource {
128+
linkedCollections := linkedRetentionCollections(current.DataTableCollection)
129+
if len(linkedCollections) == 0 {
130+
return []metadbmodel.DataSource{current}
131+
}
132+
133+
targets := make([]metadbmodel.DataSource, 0, len(linkedCollections))
134+
for _, dataSource := range all {
135+
if utils.Find(linkedCollections, dataSource.DataTableCollection) {
136+
targets = append(targets, dataSource)
137+
}
138+
}
139+
return targets
140+
}
141+
111142
func (d *DataSource) GetDataSources(orgID int, filter map[string]interface{}, specCfg *config.Specification) (resp []model.DataSource, err error) {
112143
dbInfo, err := metadb.GetDB(orgID)
113144
if err != nil {
@@ -409,8 +440,29 @@ func (d *DataSource) UpdateDataSource(orgID int, lcuuid string, dataSourceUpdate
409440
return response[0], nil
410441
}
411442

412-
oldRetentionTime := dataSource.RetentionTime
443+
targets := []metadbmodel.DataSource{dataSource}
444+
if linkedCollections := linkedRetentionCollections(dataSource.DataTableCollection); len(linkedCollections) > 0 {
445+
var grouped []metadbmodel.DataSource
446+
if err := db.Where("data_table_collection IN ?", linkedCollections).
447+
Order("id ASC").
448+
Find(&grouped).Error; err != nil {
449+
return model.DataSource{}, err
450+
}
451+
targets = resolveRetentionTargets(dataSource, grouped)
452+
if len(targets) != len(linkedCollections) {
453+
return model.DataSource{}, response.ServiceError(
454+
httpcommon.SERVER_ERROR,
455+
fmt.Sprintf("linked retention data sources incomplete for %s", dataSource.DataTableCollection),
456+
)
457+
}
458+
}
459+
460+
oldRetentionTimes := make(map[string]int, len(targets))
413461
if dataSourceUpdate.RetentionTime != nil {
462+
for i := range targets {
463+
oldRetentionTimes[targets[i].Lcuuid] = targets[i].RetentionTime
464+
targets[i].RetentionTime = *dataSourceUpdate.RetentionTime
465+
}
414466
dataSource.RetentionTime = *dataSourceUpdate.RetentionTime
415467
}
416468

@@ -421,29 +473,33 @@ func (d *DataSource) UpdateDataSource(orgID int, lcuuid string, dataSourceUpdate
421473
}
422474
var errs []error
423475
for _, analyzer := range analyzers {
424-
err = d.CallIngesterAPIModRP(orgID, analyzer.IP, dataSource)
425-
if err != nil {
426-
errs = append(errs, fmt.Errorf(
427-
"failed to config analyzer (name: %s, ip:%s) update data_source (%s) error: %w",
428-
analyzer.Name, analyzer.IP, dataSource.DisplayName, err,
429-
))
430-
continue
476+
for _, target := range targets {
477+
err = d.CallIngesterAPIModRP(orgID, analyzer.IP, target)
478+
if err != nil {
479+
errs = append(errs, fmt.Errorf(
480+
"failed to config analyzer (name: %s, ip:%s) update data_source (%s) error: %w",
481+
analyzer.Name, analyzer.IP, target.DisplayName, err,
482+
))
483+
continue
484+
}
485+
log.Infof("config analyzer (%s) mod data_source (%s) complete, retention time change: %ds -> %ds",
486+
analyzer.IP, target.DisplayName, oldRetentionTimes[target.Lcuuid], target.RetentionTime, dbInfo.LogPrefixORGID)
431487
}
432-
log.Infof("config analyzer (%s) mod data_source (%s) complete, retention time change: %ds -> %ds",
433-
analyzer.IP, dataSource.DisplayName, oldRetentionTime, dataSource.RetentionTime, dbInfo.LogPrefixORGID)
434488
}
435489

436490
if len(errs) == 0 {
437-
if err := db.Model(&dataSource).Updates(
438-
map[string]interface{}{
439-
"state": common.DATA_SOURCE_STATE_NORMAL,
440-
"retention_time": dataSource.RetentionTime,
441-
},
442-
).Error; err != nil {
443-
return model.DataSource{}, err
491+
for _, target := range targets {
492+
if err := db.Model(&target).Updates(
493+
map[string]interface{}{
494+
"state": common.DATA_SOURCE_STATE_NORMAL,
495+
"retention_time": target.RetentionTime,
496+
},
497+
).Error; err != nil {
498+
return model.DataSource{}, err
499+
}
500+
log.Infof("update data_source (%s), retention time change: %ds -> %ds",
501+
target.DisplayName, oldRetentionTimes[target.Lcuuid], target.RetentionTime, dbInfo.LogPrefixORGID)
444502
}
445-
log.Infof("update data_source (%s), retention time change: %ds -> %ds",
446-
dataSource.DisplayName, oldRetentionTime, dataSource.RetentionTime, dbInfo.LogPrefixORGID)
447503
}
448504
var errStrs []string
449505
for _, e := range errs {
@@ -453,10 +509,12 @@ func (d *DataSource) UpdateDataSource(orgID int, lcuuid string, dataSourceUpdate
453509

454510
for _, e := range errs {
455511
if errors.Is(e, httpcommon.ErrorFail) {
456-
if err := db.Model(&dataSource).Updates(
457-
map[string]interface{}{"state": common.DATA_SOURCE_STATE_EXCEPTION},
458-
).Error; err != nil {
459-
return model.DataSource{}, err
512+
for _, target := range targets {
513+
if err := db.Model(&target).Updates(
514+
map[string]interface{}{"state": common.DATA_SOURCE_STATE_EXCEPTION},
515+
).Error; err != nil {
516+
return model.DataSource{}, err
517+
}
460518
}
461519
err = response.ServiceError(httpcommon.PARTIAL_CONTENT, errMsg)
462520
break

0 commit comments

Comments
 (0)