Skip to content

Commit 0d33235

Browse files
committed
feat(db): 添加数据库去重工具和优化索引创建
perf(scheduler): 优化流量调度器重试机制和时间对齐 feat(config): 调整 SQLite 连接配置和自动去重选项
1 parent d75eefd commit 0d33235

7 files changed

Lines changed: 806 additions & 210 deletions

File tree

cmd/dbtool/main.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package main
2+
3+
import (
4+
dbPkg "NodePassDash/internal/db"
5+
log "NodePassDash/internal/log"
6+
"flag"
7+
"fmt"
8+
"os"
9+
"time"
10+
11+
"gorm.io/gorm"
12+
)
13+
14+
func main() {
15+
dedupTrafficHourly := flag.Bool("dedup-traffic-hourly", false, "去重 traffic_hourly_summary 并创建唯一索引")
16+
onlyCreateIndexes := flag.Bool("create-indexes", false, "仅创建优化索引(不去重)")
17+
flag.Parse()
18+
19+
if !*dedupTrafficHourly && !*onlyCreateIndexes {
20+
fmt.Fprintln(os.Stderr, "Usage:")
21+
fmt.Fprintln(os.Stderr, " dbtool --dedup-traffic-hourly")
22+
fmt.Fprintln(os.Stderr, " dbtool --create-indexes")
23+
os.Exit(2)
24+
}
25+
26+
db := dbPkg.GetDB()
27+
if db == nil {
28+
log.Error("获取数据库连接失败")
29+
os.Exit(1)
30+
}
31+
32+
start := time.Now()
33+
34+
if *dedupTrafficHourly {
35+
log.Info("[dbtool]开始去重 traffic_hourly_summary ...")
36+
if err := db.Transaction(func(tx *gorm.DB) error {
37+
res := tx.Exec(`
38+
DELETE FROM traffic_hourly_summary
39+
WHERE id NOT IN (
40+
SELECT MAX(id)
41+
FROM traffic_hourly_summary
42+
GROUP BY hour_time, endpoint_id, instance_id
43+
)
44+
`)
45+
if res.Error != nil {
46+
return res.Error
47+
}
48+
log.Infof("[dbtool]traffic_hourly_summary 去重删除 %d 行", res.RowsAffected)
49+
return nil
50+
}); err != nil {
51+
log.Errorf("[dbtool]去重失败: %v", err)
52+
os.Exit(1)
53+
}
54+
}
55+
56+
if *dedupTrafficHourly || *onlyCreateIndexes {
57+
log.Info("[dbtool]创建/校验索引 ...")
58+
if err := db.Transaction(func(tx *gorm.DB) error {
59+
stmts := []string{
60+
"CREATE UNIQUE INDEX IF NOT EXISTS uniq_traffic_hourly_summary_hour_endpoint_instance ON traffic_hourly_summary(hour_time, endpoint_id, instance_id)",
61+
"CREATE INDEX IF NOT EXISTS idx_service_history_instance_time ON service_history(instance_id, record_time)",
62+
"CREATE INDEX IF NOT EXISTS idx_service_history_endpoint_instance_time ON service_history(endpoint_id, instance_id, record_time)",
63+
}
64+
for _, sql := range stmts {
65+
if err := tx.Exec(sql).Error; err != nil {
66+
return err
67+
}
68+
}
69+
return nil
70+
}); err != nil {
71+
log.Errorf("[dbtool]创建索引失败: %v", err)
72+
os.Exit(1)
73+
}
74+
}
75+
76+
log.Infof("[dbtool]完成,耗时: %v", time.Since(start))
77+
}

internal/dashboard/scheduler.go

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ type TrafficScheduler struct {
1313
db *gorm.DB
1414
trafficService *TrafficService
1515
cleanupService *CleanupService
16-
ticker *time.Ticker
1716
ctx context.Context
1817
cancel context.CancelFunc
1918
}
@@ -35,32 +34,51 @@ func NewTrafficScheduler(db *gorm.DB) *TrafficScheduler {
3534
func (s *TrafficScheduler) Start() {
3635
log.Println("[流量调度器] 启动定时任务...")
3736

38-
// 每小时执行一次数据聚合
39-
s.ticker = time.NewTicker(1 * time.Hour)
40-
4137
// 立即执行一次初始化,汇总最近24小时的数据
4238
go func() {
39+
// 启动后稍作延迟,减少与 SSE 初始写入的锁竞争
40+
time.Sleep(10 * time.Second)
41+
4342
log.Println("[流量调度器] 开始初始化最近24小时流量汇总数据...")
4443
start := time.Now()
4544

46-
if err := s.trafficService.InitializeRecentTrafficData(); err != nil {
47-
log.Printf("[流量调度器] 初始化24小时汇总数据失败: %v", err)
45+
// 对 SQLite locked 做有限重试
46+
var err error
47+
for attempt := 1; attempt <= 5; attempt++ {
48+
err = s.trafficService.InitializeRecentTrafficData()
49+
if err == nil {
50+
break
51+
}
52+
log.Printf("[流量调度器] 初始化24小时汇总数据失败(第%d次): %v", attempt, err)
53+
time.Sleep(time.Duration(attempt) * 2 * time.Second)
54+
}
55+
56+
if err != nil {
57+
log.Printf("[流量调度器] 初始化24小时汇总数据最终失败: %v", err)
4858
} else {
4959
duration := time.Since(start)
5060
log.Printf("[流量调度器] 初始化24小时汇总数据完成,耗时: %v", duration)
5161
}
5262

5363
// 然后执行上一小时的常规聚合(如果有遗漏)
5464
log.Println("[流量调度器] 执行启动时常规数据聚合...")
55-
if err := s.trafficService.AggregateTrafficData(); err != nil {
56-
log.Printf("[流量调度器] 启动时常规数据聚合失败: %v", err)
65+
for attempt := 1; attempt <= 5; attempt++ {
66+
err = s.trafficService.AggregateTrafficData()
67+
if err == nil {
68+
break
69+
}
70+
log.Printf("[流量调度器] 启动时常规数据聚合失败(第%d次): %v", attempt, err)
71+
time.Sleep(time.Duration(attempt) * 2 * time.Second)
72+
}
73+
if err != nil {
74+
log.Printf("[流量调度器] 启动时常规数据聚合最终失败: %v", err)
5775
} else {
5876
log.Println("[流量调度器] 启动时常规数据聚合完成")
5977
}
6078
}()
6179

6280
// 启动定时任务
63-
go s.run()
81+
go s.runAligned()
6482

6583
// 启动数据清理任务(每天凌晨3点执行)
6684
go s.runCleanupTask()
@@ -72,22 +90,28 @@ func (s *TrafficScheduler) Start() {
7290
func (s *TrafficScheduler) Stop() {
7391
log.Println("[流量调度器] 停止定时任务...")
7492

75-
if s.ticker != nil {
76-
s.ticker.Stop()
77-
}
78-
7993
s.cancel()
8094
log.Println("[流量调度器] 定时任务已停止")
8195
}
8296

83-
// run 运行主要的聚合任务
84-
func (s *TrafficScheduler) run() {
97+
// runAligned 在每个整点后延迟一小段时间执行聚合,减少与分钟写入尖峰的冲突
98+
func (s *TrafficScheduler) runAligned() {
99+
const delayAfterHour = 2 * time.Minute
100+
101+
now := time.Now()
102+
nextRun := now.Truncate(time.Hour).Add(1 * time.Hour).Add(delayAfterHour)
103+
timer := time.NewTimer(time.Until(nextRun))
104+
defer timer.Stop()
105+
85106
for {
86107
select {
87108
case <-s.ctx.Done():
88109
return
89-
case <-s.ticker.C:
110+
case <-timer.C:
90111
s.executeAggregation()
112+
113+
nextRun = nextRun.Add(1 * time.Hour)
114+
timer.Reset(time.Until(nextRun))
91115
}
92116
}
93117
}

0 commit comments

Comments
 (0)